From: Tony Tkacik Date: Tue, 18 Feb 2014 10:59:36 +0000 (+0000) Subject: Merge "Split message aggregators" X-Git-Tag: autorelease-tag-v20140601202136_82eb3f9~415 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=commitdiff_plain;h=605ec74419f5de738d1689203e7bdd798c82f7af;hp=f9f75adef63629909986f8cefabc33144bfba74c;p=controller.git Merge "Split message aggregators" --- diff --git a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java index 0138cf2bcb..8f9d89f1f1 100644 --- a/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java +++ b/opendaylight/netconf/netconf-impl/src/test/java/org/opendaylight/controller/netconf/impl/MessageParserTest.java @@ -24,8 +24,8 @@ import org.junit.Test; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.util.handler.ChunkedFramingMechanismEncoder; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; +import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; @@ -48,7 +48,7 @@ public class MessageParserTest { FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK), new NetconfMessageToXMLEncoder(), - new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(), + new NetconfChunkAggregator(), new NetconfXMLToMessageDecoder()); testChunkChannel.writeOutbound(this.msg); @@ -94,15 +94,14 @@ public class MessageParserTest { public void testEOMFramingMechanismOnPipeline() throws Exception { EmbeddedChannel testChunkChannel = new EmbeddedChannel( FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM), - new NetconfMessageToXMLEncoder(), new NetconfMessageAggregator( - FramingMechanism.EOM), new NetconfXMLToMessageDecoder()); + new NetconfMessageToXMLEncoder(), new NetconfEOMAggregator(), new NetconfXMLToMessageDecoder()); testChunkChannel.writeOutbound(this.msg); ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound(); - byte[] eom = new byte[NetconfMessageConstants.endOfMessage.length]; - recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.endOfMessage.length, eom); - assertArrayEquals(NetconfMessageConstants.endOfMessage, eom); + byte[] eom = new byte[NetconfMessageConstants.END_OF_MESSAGE.length]; + recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.END_OF_MESSAGE.length, eom); + assertArrayEquals(NetconfMessageConstants.END_OF_MESSAGE, eom); testChunkChannel.writeInbound(recievedOutbound); NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound(); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java index 0910d9403a..48a45845a4 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractChannelInitializer.java @@ -13,8 +13,8 @@ import io.netty.util.concurrent.Promise; import org.opendaylight.controller.netconf.api.NetconfSession; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; +import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfHelloMessageToXMLEncoder; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; @@ -27,7 +27,7 @@ public abstract class AbstractChannelInitializer { public static final String NETCONF_SESSION_NEGOTIATOR = "negotiator"; public void initialize(SocketChannel ch, Promise promise) { - ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfMessageAggregator(FramingMechanism.EOM)); + ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfEOMAggregator()); initializeMessageDecoder(ch); ch.pipeline().addLast(NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM)); initializeMessageEncoder(ch); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index 9986b82bd8..ea9fc5dce4 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -8,6 +8,17 @@ package org.opendaylight.controller.netconf.util; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.Promise; + import java.util.concurrent.TimeUnit; import org.opendaylight.controller.netconf.api.AbstractNetconfSession; @@ -15,12 +26,11 @@ import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; -import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.protocol.framework.AbstractSessionNegotiator; import org.slf4j.Logger; @@ -31,23 +41,11 @@ import org.w3c.dom.NodeList; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.Timeout; -import io.netty.util.Timer; -import io.netty.util.TimerTask; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.Promise; - public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> extends AbstractSessionNegotiator { private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; - public static final String CHUNK_DECODER_CHANNEL_HANDLER_KEY = "chunkDecoder"; protected final P sessionPreferences; @@ -162,7 +160,7 @@ extends AbstractSessionNegotiator { } changeState(State.ESTABLISHED); - S session = getSession(sessionListener, channel, (NetconfHelloMessage)netconfMessage); + S session = getSession(sessionListener, channel, netconfMessage); negotiationSuccessful(session); } else { @@ -180,9 +178,7 @@ extends AbstractSessionNegotiator { replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK)); replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR, - new NetconfMessageAggregator(FramingMechanism.CHUNK)); - channel.pipeline().addAfter(AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR, - CHUNK_DECODER_CHANNEL_HANDLER_KEY, new NetconfMessageChunkDecoder()); + new NetconfChunkAggregator()); } private boolean shouldUseChunkFraming(Document doc) { diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java index 383cebb04f..a3efe8a16b 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/EOMFramingMechanismEncoder.java @@ -18,6 +18,6 @@ public class EOMFramingMechanismEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { out.writeBytes(msg); - out.writeBytes(NetconfMessageConstants.endOfMessage); + out.writeBytes(NetconfMessageConstants.END_OF_MESSAGE); } } diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java new file mode 100644 index 0000000000..cee8e7133a --- /dev/null +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfChunkAggregator.java @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.netconf.util.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetconfChunkAggregator extends ByteToMessageDecoder { + private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class); + public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024; + + private static enum State { + HEADER_ONE, // \n + HEADER_TWO, // # + HEADER_LENGTH_FIRST, // [1-9] + HEADER_LENGTH_OTHER, // [0-9]*\n + DATA, + FOOTER_ONE, // \n + FOOTER_TWO, // # + FOOTER_THREE, // # + FOOTER_FOUR, // \n + } + + private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE; + private State state = State.HEADER_ONE; + private long chunkSize; + private ByteBuf chunk; + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + while (in.isReadable()) { + switch (state) { + case HEADER_ONE: + { + final byte b = in.readByte(); + if (b != '\n') { + throw new IllegalStateException("Malformed chunk header encountered (byte 0)"); + } + + state = State.HEADER_TWO; + break; + } + case HEADER_TWO: + { + final byte b = in.readByte(); + if (b != '#') { + throw new IllegalStateException("Malformed chunk header encountered (byte 1)"); + } + + state = State.HEADER_LENGTH_FIRST; + break; + } + case HEADER_LENGTH_FIRST: + { + final byte b = in.readByte(); + if (b < '1' || b > '9') { + throw new IllegalStateException("Invalid chunk size encountered (byte 0)"); + } + + chunkSize = b - '0'; + state = State.HEADER_LENGTH_OTHER; + break; + } + case HEADER_LENGTH_OTHER: + { + final byte b = in.readByte(); + if (b == '\n') { + state = State.DATA; + break; + } + + if (b < '0' || b > '9') { + throw new IllegalStateException("Invalid chunk size encountered"); + } + + chunkSize *= 10; + chunkSize += b - '0'; + + if (chunkSize > maxChunkSize) { + throw new IllegalStateException("Maximum chunk size exceeded"); + } + break; + } + case DATA: + /* + * FIXME: this gathers all data into one big chunk before passing + * it on. Make sure the pipeline can work with partial data + * and then change this piece to pass the data on as it + * comes through. + */ + if (in.readableBytes() < chunkSize) { + logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize); + in.discardReadBytes(); + return; + } + + chunk = in.readBytes((int)chunkSize); + state = State.FOOTER_ONE; + break; + case FOOTER_ONE: + { + final byte b = in.readByte(); + if (b != '\n') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'\n'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 0)"); + } + + state = State.FOOTER_TWO; + break; + } + case FOOTER_TWO: + { + final byte b = in.readByte(); + if (b != '#') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 1)"); + } + + state = State.FOOTER_THREE; + break; + } + case FOOTER_THREE: + { + final byte b = in.readByte(); + if (b != '#') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'#'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 2)"); + } + + state = State.FOOTER_FOUR; + break; + } + case FOOTER_FOUR: + { + final byte b = in.readByte(); + if (b != '\n') { + logger.debug("Got byte {} while waiting for {}", b, (byte)'\n'); + throw new IllegalStateException("Malformed chunk footer encountered (byte 3)"); + } + + state = State.HEADER_ONE; + out.add(chunk); + chunkSize = 0; + chunk = null; + break; + } + } + } + + in.discardReadBytes(); + } +} diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java similarity index 79% rename from opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java rename to opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java index a2486050f9..9435e6ff73 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageAggregator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfEOMAggregator.java @@ -8,32 +8,24 @@ package org.opendaylight.controller.netconf.util.handler; -import com.google.common.base.Charsets; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; -import org.opendaylight.controller.netconf.util.messages.FramingMechanism; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; -public class NetconfMessageAggregator extends ByteToMessageDecoder { - - private final static Logger logger = LoggerFactory.getLogger(NetconfMessageAggregator.class); +import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private byte[] eom = NetconfMessageConstants.endOfMessage; +import com.google.common.base.Charsets; - public NetconfMessageAggregator(FramingMechanism framingMechanism) { - if (framingMechanism == FramingMechanism.CHUNK) { - eom = NetconfMessageConstants.endOfChunk; - } - } +public class NetconfEOMAggregator extends ByteToMessageDecoder { + private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class); @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - int index = indexOfSequence(in, eom); + int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE); if (index == -1) { logger.debug("Message is not complete, read again."); if (logger.isTraceEnabled()) { @@ -43,7 +35,7 @@ public class NetconfMessageAggregator extends ByteToMessageDecoder { ctx.read(); } else { ByteBuf msg = in.readBytes(index); - in.readBytes(eom.length); + in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length); in.discardReadBytes(); logger.debug("Message is complete."); out.add(msg); diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java deleted file mode 100644 index 77ef3861d2..0000000000 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfMessageChunkDecoder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ - -package org.opendaylight.controller.netconf.util.handler; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - -import org.opendaylight.controller.netconf.api.NetconfDeserializerException; -import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Charsets; - -public class NetconfMessageChunkDecoder extends ByteToMessageDecoder { - private final static Logger logger = LoggerFactory.getLogger(NetconfMessageChunkDecoder.class); - - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { - ByteBuf byteBufMsg = Unpooled.buffer(in.readableBytes()); - int chunkSize = -1; - boolean isParsed = false; - while (in.isReadable()) { - try { - if (!isParsed) { - chunkSize = readHeader(in); - isParsed = true; - } - if (chunkSize != -1 && isParsed) { - in.readBytes(byteBufMsg, chunkSize); - isParsed = false; - } else { - throw new NetconfDeserializerException("Unable to parse chunked data or header."); - } - } catch (Exception e) { - logger.error("Failed to decode chunked message.", e); - this.exceptionCaught(ctx, e); - } - } - out.add(byteBufMsg); - isParsed = false; - } - - private int readHeader(ByteBuf in) { - ByteBuf chunkSize = Unpooled.buffer(NetconfMessageConstants.MIN_HEADER_LENGTH, - NetconfMessageConstants.MAX_HEADER_LENGTH); - byte b = in.readByte(); - if (b != 10) - return -1; - b = in.readByte(); - if (b != 35) - return -1; - while ((b = in.readByte()) != 10) { - chunkSize.writeByte(b); - } - return Integer.parseInt(chunkSize.toString(Charsets.US_ASCII)); - } - -} diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java index 1a08f9063f..c111998e0a 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/messages/NetconfMessageConstants.java @@ -11,12 +11,15 @@ package org.opendaylight.controller.netconf.util.messages; import com.google.common.base.Charsets; public class NetconfMessageConstants { - - public static final byte[] endOfMessage = "]]>]]>".getBytes(Charsets.UTF_8); + /** + * The NETCONF 1.0 old-style message separator. This is framing mechanism + * is used by default. + */ + public static final byte[] END_OF_MESSAGE = "]]>]]>".getBytes(Charsets.UTF_8); public static final byte[] endOfChunk = "\n##\n".getBytes(Charsets.UTF_8); public static final int MIN_HEADER_LENGTH = 4; // bytes public static final int MAX_HEADER_LENGTH = 13; // bytes -} \ No newline at end of file +}