import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.handler.ChunkedFramingMechanismEncoder;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK),
new NetconfMessageToXMLEncoder(),
- new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(),
+ new NetconfChunkAggregator(),
new NetconfXMLToMessageDecoder());
testChunkChannel.writeOutbound(this.msg);
public void testEOMFramingMechanismOnPipeline() throws Exception {
EmbeddedChannel testChunkChannel = new EmbeddedChannel(
FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM),
- new NetconfMessageToXMLEncoder(), new NetconfMessageAggregator(
- FramingMechanism.EOM), new NetconfXMLToMessageDecoder());
+ new NetconfMessageToXMLEncoder(), new NetconfEOMAggregator(), new NetconfXMLToMessageDecoder());
testChunkChannel.writeOutbound(this.msg);
ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound();
- byte[] eom = new byte[NetconfMessageConstants.endOfMessage.length];
- recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.endOfMessage.length, eom);
- assertArrayEquals(NetconfMessageConstants.endOfMessage, eom);
+ byte[] eom = new byte[NetconfMessageConstants.END_OF_MESSAGE.length];
+ recievedOutbound.getBytes(recievedOutbound.readableBytes() - NetconfMessageConstants.END_OF_MESSAGE.length, eom);
+ assertArrayEquals(NetconfMessageConstants.END_OF_MESSAGE, eom);
testChunkChannel.writeInbound(recievedOutbound);
NetconfMessage receivedMessage = (NetconfMessage) testChunkChannel.readInbound();
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfEOMAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfHelloMessageToXMLEncoder;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
public static final String NETCONF_SESSION_NEGOTIATOR = "negotiator";
public void initialize(SocketChannel ch, Promise<S> promise) {
- ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfMessageAggregator(FramingMechanism.EOM));
+ ch.pipeline().addLast(NETCONF_MESSAGE_AGGREGATOR, new NetconfEOMAggregator());
initializeMessageDecoder(ch);
ch.pipeline().addLast(NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
initializeMessageEncoder(ch);
package org.opendaylight.controller.netconf.util;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.Promise;
+
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.netconf.api.AbstractNetconfSession;
import org.opendaylight.controller.netconf.api.NetconfSessionListener;
import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
-import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
+import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
import org.opendaylight.controller.netconf.util.xml.XmlUtil;
import org.opendaylight.protocol.framework.AbstractSessionNegotiator;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.Timeout;
-import io.netty.util.Timer;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.Promise;
-
public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionPreferences, S extends AbstractNetconfSession<S, L>, L extends NetconfSessionListener<S>>
extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class);
public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler";
- public static final String CHUNK_DECODER_CHANNEL_HANDLER_KEY = "chunkDecoder";
protected final P sessionPreferences;
}
changeState(State.ESTABLISHED);
- S session = getSession(sessionListener, channel, (NetconfHelloMessage)netconfMessage);
+ S session = getSession(sessionListener, channel, netconfMessage);
negotiationSuccessful(session);
} else {
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
- new NetconfMessageAggregator(FramingMechanism.CHUNK));
- channel.pipeline().addAfter(AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
- CHUNK_DECODER_CHANNEL_HANDLER_KEY, new NetconfMessageChunkDecoder());
+ new NetconfChunkAggregator());
}
private boolean shouldUseChunkFraming(Document doc) {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
out.writeBytes(msg);
- out.writeBytes(NetconfMessageConstants.endOfMessage);
+ out.writeBytes(NetconfMessageConstants.END_OF_MESSAGE);
}
}
--- /dev/null
+/*
+ * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.netconf.util.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetconfChunkAggregator extends ByteToMessageDecoder {
+ private final static Logger logger = LoggerFactory.getLogger(NetconfChunkAggregator.class);
+ public static final int DEFAULT_MAXIMUM_CHUNK_SIZE = 16 * 1024 * 1024;
+
+ private static enum State {
+ HEADER_ONE, // \n
+ HEADER_TWO, // #
+ HEADER_LENGTH_FIRST, // [1-9]
+ HEADER_LENGTH_OTHER, // [0-9]*\n
+ DATA,
+ FOOTER_ONE, // \n
+ FOOTER_TWO, // #
+ FOOTER_THREE, // #
+ FOOTER_FOUR, // \n
+ }
+
+ private final int maxChunkSize = DEFAULT_MAXIMUM_CHUNK_SIZE;
+ private State state = State.HEADER_ONE;
+ private long chunkSize;
+ private ByteBuf chunk;
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ while (in.isReadable()) {
+ switch (state) {
+ case HEADER_ONE:
+ {
+ final byte b = in.readByte();
+ if (b != '\n') {
+ throw new IllegalStateException("Malformed chunk header encountered (byte 0)");
+ }
+
+ state = State.HEADER_TWO;
+ break;
+ }
+ case HEADER_TWO:
+ {
+ final byte b = in.readByte();
+ if (b != '#') {
+ throw new IllegalStateException("Malformed chunk header encountered (byte 1)");
+ }
+
+ state = State.HEADER_LENGTH_FIRST;
+ break;
+ }
+ case HEADER_LENGTH_FIRST:
+ {
+ final byte b = in.readByte();
+ if (b < '1' || b > '9') {
+ throw new IllegalStateException("Invalid chunk size encountered (byte 0)");
+ }
+
+ chunkSize = b - '0';
+ state = State.HEADER_LENGTH_OTHER;
+ break;
+ }
+ case HEADER_LENGTH_OTHER:
+ {
+ final byte b = in.readByte();
+ if (b == '\n') {
+ state = State.DATA;
+ break;
+ }
+
+ if (b < '0' || b > '9') {
+ throw new IllegalStateException("Invalid chunk size encountered");
+ }
+
+ chunkSize *= 10;
+ chunkSize += b - '0';
+
+ if (chunkSize > maxChunkSize) {
+ throw new IllegalStateException("Maximum chunk size exceeded");
+ }
+ break;
+ }
+ case DATA:
+ /*
+ * FIXME: this gathers all data into one big chunk before passing
+ * it on. Make sure the pipeline can work with partial data
+ * and then change this piece to pass the data on as it
+ * comes through.
+ */
+ if (in.readableBytes() < chunkSize) {
+ logger.debug("Buffer has {} bytes, need {} to complete chunk", in.readableBytes(), chunkSize);
+ in.discardReadBytes();
+ return;
+ }
+
+ chunk = in.readBytes((int)chunkSize);
+ state = State.FOOTER_ONE;
+ break;
+ case FOOTER_ONE:
+ {
+ final byte b = in.readByte();
+ if (b != '\n') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 0)");
+ }
+
+ state = State.FOOTER_TWO;
+ break;
+ }
+ case FOOTER_TWO:
+ {
+ final byte b = in.readByte();
+ if (b != '#') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 1)");
+ }
+
+ state = State.FOOTER_THREE;
+ break;
+ }
+ case FOOTER_THREE:
+ {
+ final byte b = in.readByte();
+ if (b != '#') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'#');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 2)");
+ }
+
+ state = State.FOOTER_FOUR;
+ break;
+ }
+ case FOOTER_FOUR:
+ {
+ final byte b = in.readByte();
+ if (b != '\n') {
+ logger.debug("Got byte {} while waiting for {}", b, (byte)'\n');
+ throw new IllegalStateException("Malformed chunk footer encountered (byte 3)");
+ }
+
+ state = State.HEADER_ONE;
+ out.add(chunk);
+ chunkSize = 0;
+ chunk = null;
+ break;
+ }
+ }
+ }
+
+ in.discardReadBytes();
+ }
+}
package org.opendaylight.controller.netconf.util.handler;
-import com.google.common.base.Charsets;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
-import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
-public class NetconfMessageAggregator extends ByteToMessageDecoder {
-
- private final static Logger logger = LoggerFactory.getLogger(NetconfMessageAggregator.class);
+import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
- private byte[] eom = NetconfMessageConstants.endOfMessage;
+import com.google.common.base.Charsets;
- public NetconfMessageAggregator(FramingMechanism framingMechanism) {
- if (framingMechanism == FramingMechanism.CHUNK) {
- eom = NetconfMessageConstants.endOfChunk;
- }
- }
+public class NetconfEOMAggregator extends ByteToMessageDecoder {
+ private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- int index = indexOfSequence(in, eom);
+ int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
if (index == -1) {
logger.debug("Message is not complete, read again.");
if (logger.isTraceEnabled()) {
ctx.read();
} else {
ByteBuf msg = in.readBytes(index);
- in.readBytes(eom.length);
+ in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
in.discardReadBytes();
logger.debug("Message is complete.");
out.add(msg);
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.util.handler;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
-import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Charsets;
-
-public class NetconfMessageChunkDecoder extends ByteToMessageDecoder {
- private final static Logger logger = LoggerFactory.getLogger(NetconfMessageChunkDecoder.class);
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
- ByteBuf byteBufMsg = Unpooled.buffer(in.readableBytes());
- int chunkSize = -1;
- boolean isParsed = false;
- while (in.isReadable()) {
- try {
- if (!isParsed) {
- chunkSize = readHeader(in);
- isParsed = true;
- }
- if (chunkSize != -1 && isParsed) {
- in.readBytes(byteBufMsg, chunkSize);
- isParsed = false;
- } else {
- throw new NetconfDeserializerException("Unable to parse chunked data or header.");
- }
- } catch (Exception e) {
- logger.error("Failed to decode chunked message.", e);
- this.exceptionCaught(ctx, e);
- }
- }
- out.add(byteBufMsg);
- isParsed = false;
- }
-
- private int readHeader(ByteBuf in) {
- ByteBuf chunkSize = Unpooled.buffer(NetconfMessageConstants.MIN_HEADER_LENGTH,
- NetconfMessageConstants.MAX_HEADER_LENGTH);
- byte b = in.readByte();
- if (b != 10)
- return -1;
- b = in.readByte();
- if (b != 35)
- return -1;
- while ((b = in.readByte()) != 10) {
- chunkSize.writeByte(b);
- }
- return Integer.parseInt(chunkSize.toString(Charsets.US_ASCII));
- }
-
-}
import com.google.common.base.Charsets;
public class NetconfMessageConstants {
-
- public static final byte[] endOfMessage = "]]>]]>".getBytes(Charsets.UTF_8);
+ /**
+ * The NETCONF 1.0 old-style message separator. This is framing mechanism
+ * is used by default.
+ */
+ public static final byte[] END_OF_MESSAGE = "]]>]]>".getBytes(Charsets.UTF_8);
public static final byte[] endOfChunk = "\n##\n".getBytes(Charsets.UTF_8);
public static final int MIN_HEADER_LENGTH = 4; // bytes
public static final int MAX_HEADER_LENGTH = 13; // bytes
-}
\ No newline at end of file
+}