}
in.discardReadBytes();
}
+
}
+
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-
-import java.util.Queue;
-
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Queue;
+
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.netconf.api.NetconfMessage;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageHeader;
import org.opendaylight.controller.netconf.util.test.XmlFileLoader;
-import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
-import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
public class MessageParserTest {
private NetconfMessage msg;
- private NetconfMessageFactory msgFactory = new NetconfMessageFactory();
@Before
public void setUp() throws Exception {
public void testChunkedFramingMechanismOnPipeline() throws Exception {
EmbeddedChannel testChunkChannel = new EmbeddedChannel(
FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK),
- new ProtocolMessageEncoder<NetconfMessage>(msgFactory),
+ new NetconfMessageToXMLEncoder(),
new NetconfMessageAggregator(FramingMechanism.CHUNK), new NetconfMessageChunkDecoder(),
- new ProtocolMessageDecoder<NetconfMessage>(msgFactory));
+ new NetconfXMLToMessageDecoder());
testChunkChannel.writeOutbound(this.msg);
Queue<Object> messages = testChunkChannel.outboundMessages();
assertFalse(messages.isEmpty());
- int msgLength = this.msgFactory.put(this.msg).length;
+ final NetconfMessageToXMLEncoder enc = new NetconfMessageToXMLEncoder();
+ final ByteBuf out = Unpooled.buffer();
+ enc.encode(null, msg, out);
+ int msgLength = out.readableBytes();
+
int chunkCount = msgLength / NetconfMessageConstants.MAX_CHUNK_SIZE;
if ((msgLength % NetconfMessageConstants.MAX_CHUNK_SIZE) != 0) {
chunkCount++;
public void testEOMFramingMechanismOnPipeline() throws Exception {
EmbeddedChannel testChunkChannel = new EmbeddedChannel(
FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM),
- new ProtocolMessageEncoder<NetconfMessage>(msgFactory), new NetconfMessageAggregator(
- FramingMechanism.EOM), new ProtocolMessageDecoder<NetconfMessage>(msgFactory));
+ new NetconfMessageToXMLEncoder(), new NetconfMessageAggregator(
+ FramingMechanism.EOM), new NetconfXMLToMessageDecoder());
testChunkChannel.writeOutbound(this.msg);
ByteBuf recievedOutbound = (ByteBuf) testChunkChannel.readOutbound();
import org.opendaylight.controller.netconf.api.NetconfSession;
import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
-import org.opendaylight.controller.netconf.util.handler.NetconfHandlerFactory;
import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
public abstract class AbstractChannelInitializer {
public void initialize(SocketChannel ch, Promise<? extends NetconfSession> promise){
- NetconfHandlerFactory handlerFactory = new NetconfHandlerFactory();
ch.pipeline().addLast("aggregator", new NetconfMessageAggregator(FramingMechanism.EOM));
- ch.pipeline().addLast(handlerFactory.getDecoders());
+ ch.pipeline().addLast(new NetconfXMLToMessageDecoder());
initializeAfterDecoder(ch, promise);
ch.pipeline().addLast("frameEncoder", FramingMechanismHandlerFactory.createHandler(FramingMechanism.EOM));
- ch.pipeline().addLast(handlerFactory.getEncoders());
+ ch.pipeline().addLast(new NetconfMessageToXMLEncoder());
}
protected abstract void initializeAfterDecoder(SocketChannel ch, Promise<? extends NetconfSession> promise);
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.netconf.util.handler;
-
-import io.netty.channel.ChannelHandler;
-
-import org.opendaylight.controller.netconf.util.messages.NetconfMessageFactory;
-import org.opendaylight.protocol.framework.ProtocolMessageDecoder;
-import org.opendaylight.protocol.framework.ProtocolMessageEncoder;
-
-public class NetconfHandlerFactory {
- private final NetconfMessageFactory msgFactory = new NetconfMessageFactory();
-
- public ChannelHandler[] getEncoders() {
- return new ChannelHandler[] { new ProtocolMessageEncoder(this.msgFactory) };
- }
-
- public ChannelHandler[] getDecoders() {
- return new ChannelHandler[] { new ProtocolMessageDecoder(this.msgFactory) };
- }
-
-}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.util.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+import java.nio.ByteBuffer;
+
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Comment;
+import org.w3c.dom.Document;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+
+public final class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMessage> {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfMessageToXMLEncoder.class);
+
+ private final Optional<String> clientId;
+
+ public NetconfMessageToXMLEncoder() {
+ this(Optional.<String>absent());
+ }
+
+ public NetconfMessageToXMLEncoder(Optional<String> clientId) {
+ this.clientId = clientId;
+ }
+
+ @Override
+ @VisibleForTesting
+ public void encode(ChannelHandlerContext ctx, NetconfMessage msg, ByteBuf out) throws Exception {
+ LOG.debug("Sent to encode : {}", msg);
+
+ if (clientId.isPresent()) {
+ Comment comment = msg.getDocument().createComment("clientId:" + clientId.get());
+ msg.getDocument().appendChild(comment);
+ }
+
+ final ByteBuffer msgBytes;
+ if(msg.getAdditionalHeader().isPresent()) {
+ final String header = msg.getAdditionalHeader().get();
+ LOG.trace("Header of netconf message parsed \n{}", header);
+ // FIXME: this can be written in pieces
+ msgBytes = Charsets.UTF_8.encode(header + xmlToString(msg.getDocument()));
+ } else {
+ msgBytes = Charsets.UTF_8.encode(xmlToString(msg.getDocument()));
+ }
+
+ LOG.trace("Putting message \n{}", xmlToString(msg.getDocument()));
+ out.writeBytes(msgBytes);
+ }
+
+ private String xmlToString(Document doc) {
+ return XmlUtil.toString(doc, false);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.netconf.util.handler;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
+import org.opendaylight.controller.netconf.api.NetconfMessage;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.xml.sax.SAXException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableList;
+
+public final class NetconfXMLToMessageDecoder extends ByteToMessageDecoder {
+ private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToMessageDecoder.class);
+
+ // FIXME: this is funky way of creating arrays
+ private static final List<byte[]> POSSIBLE_ENDS = ImmutableList.of(
+ "]\n".getBytes(Charsets.UTF_8), "]\r\n".getBytes(Charsets.UTF_8));
+ private static final List<byte[]> POSSIBLE_STARTS = ImmutableList.of(
+ "[".getBytes(Charsets.UTF_8), "\r\n[".getBytes(Charsets.UTF_8), "\n[".getBytes(Charsets.UTF_8));
+
+ @Override
+ @VisibleForTesting
+ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ if (in.readableBytes() == 0) {
+ LOG.debug("No more content in incoming buffer.");
+ return;
+ }
+
+ in.markReaderIndex();
+ try {
+ LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in));
+ byte[] bytes = new byte[in.readableBytes()];
+ in.readBytes(bytes);
+
+ logMessage(bytes);
+
+ String additionalHeader = null;
+
+ if (startsWithAdditionalHeader(bytes)) {
+ // Auth information containing username, ip address... extracted for monitoring
+ int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
+ if (endOfAuthHeader > -1) {
+ byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
+ additionalHeader = additionalHeaderToString(additionalHeaderBytes);
+ bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
+ }
+ }
+ NetconfMessage message;
+ try {
+ Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
+ message = new NetconfMessage(doc, additionalHeader);
+ } catch (final SAXException | IOException | IllegalStateException e) {
+ throw new NetconfDeserializerException("Could not parse message from " + new String(bytes), e);
+ }
+
+ out.add(message);
+ } finally {
+ in.discardReadBytes();
+ }
+ }
+
+ private int getAdditionalHeaderEndIndex(byte[] bytes) {
+ for (byte[] possibleEnd : POSSIBLE_ENDS) {
+ int idx = findByteSequence(bytes, possibleEnd);
+
+ if (idx != -1) {
+ return idx;
+ }
+ }
+
+ return -1;
+ }
+
+ private static int findByteSequence(final byte[] bytes, final byte[] sequence) {
+ if (bytes.length < sequence.length) {
+ throw new IllegalArgumentException("Sequence to be found is longer than the given byte array.");
+ }
+ if (bytes.length == sequence.length) {
+ if (Arrays.equals(bytes, sequence)) {
+ return 0;
+ } else {
+ return -1;
+ }
+ }
+ int j = 0;
+ for (int i = 0; i < bytes.length; i++) {
+ if (bytes[i] == sequence[j]) {
+ j++;
+ if (j == sequence.length) {
+ return i - j + 1;
+ }
+ } else {
+ j = 0;
+ }
+ }
+ return -1;
+ }
+
+ private boolean startsWithAdditionalHeader(byte[] bytes) {
+ for (byte[] possibleStart : POSSIBLE_STARTS) {
+ int i = 0;
+ for (byte b : possibleStart) {
+ if(bytes[i] != b)
+ break;
+
+ return true;
+ }
+ }
+
+ return false;
+ };
+
+ private void logMessage(byte[] bytes) {
+ String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
+ LOG.debug("Parsing message \n{}", s);
+ }
+
+ private String additionalHeaderToString(byte[] bytes) {
+ return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
+ }
+
+}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.netconf.util.messages;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.opendaylight.controller.netconf.api.NetconfDeserializerException;
-import org.opendaylight.controller.netconf.api.NetconfMessage;
-import org.opendaylight.controller.netconf.util.xml.XmlUtil;
-import org.opendaylight.protocol.framework.DeserializerException;
-import org.opendaylight.protocol.framework.DocumentedException;
-import org.opendaylight.protocol.framework.ProtocolMessageFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Comment;
-import org.w3c.dom.Document;
-import org.xml.sax.SAXException;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableList;
-
-/**
- * NetconfMessageFactory for (de)serializing DOM documents.
- */
-public final class NetconfMessageFactory implements ProtocolMessageFactory<NetconfMessage> {
-
- private static final Logger logger = LoggerFactory.getLogger(NetconfMessageFactory.class);
- private static final List<byte[]> POSSIBLE_STARTS = ImmutableList.of(
- "[".getBytes(Charsets.UTF_8), "\r\n[".getBytes(Charsets.UTF_8), "\n[".getBytes(Charsets.UTF_8));
- private static final List<byte[]> POSSIBLE_ENDS = ImmutableList.of(
- "]\n".getBytes(Charsets.UTF_8), "]\r\n".getBytes(Charsets.UTF_8));
-
- private final Optional<String> clientId;
-
- public NetconfMessageFactory() {
- clientId = Optional.absent();
- }
-
- public NetconfMessageFactory(Optional<String> clientId) {
- this.clientId = clientId;
- }
-
- @Override
- public NetconfMessage parse(byte[] bytes) throws DeserializerException, DocumentedException {
- logMessage(bytes);
-
- String additionalHeader = null;
-
- if (startsWithAdditionalHeader(bytes)) {
- // Auth information containing username, ip address... extracted for monitoring
- int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
- if (endOfAuthHeader > -1) {
- byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
- additionalHeader = additionalHeaderToString(additionalHeaderBytes);
- bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
- }
- }
- NetconfMessage message;
- try {
- Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
- message = new NetconfMessage(doc, additionalHeader);
- } catch (final SAXException | IOException | IllegalStateException e) {
- throw new NetconfDeserializerException("Could not parse message from " + new String(bytes), e);
- }
- return message;
- }
-
- private static int findByteSequence(final byte[] bytes, final byte[] sequence) {
- if (bytes.length < sequence.length) {
- throw new IllegalArgumentException("Sequence to be found is longer than the given byte array.");
- }
- if (bytes.length == sequence.length) {
- if (Arrays.equals(bytes, sequence)) {
- return 0;
- } else {
- return -1;
- }
- }
- int j = 0;
- for (int i = 0; i < bytes.length; i++) {
- if (bytes[i] == sequence[j]) {
- j++;
- if (j == sequence.length) {
- return i - j + 1;
- }
- } else {
- j = 0;
- }
- }
- return -1;
- }
-
- private int getAdditionalHeaderEndIndex(byte[] bytes) {
- for (byte[] possibleEnd : POSSIBLE_ENDS) {
- int idx = findByteSequence(bytes, possibleEnd);
-
- if (idx != -1) {
- return idx;
- }
- }
-
- return -1;
- }
-
- private boolean startsWithAdditionalHeader(byte[] bytes) {
- for (byte[] possibleStart : POSSIBLE_STARTS) {
- int i = 0;
- for (byte b : possibleStart) {
- if(bytes[i] != b)
- break;
-
- return true;
- }
- }
-
- return false;
- };
-
- private void logMessage(byte[] bytes) {
- String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
- logger.debug("Parsing message \n{}", s);
- }
-
- private String additionalHeaderToString(byte[] bytes) {
- return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
- }
-
- @Override
- public byte[] put(NetconfMessage netconfMessage) {
- if (clientId.isPresent()) {
- Comment comment = netconfMessage.getDocument().createComment("clientId:" + clientId.get());
- netconfMessage.getDocument().appendChild(comment);
- }
- ByteBuffer msgBytes;
- if(netconfMessage.getAdditionalHeader().isPresent()) {
- String header = netconfMessage.getAdditionalHeader().get();
- logger.trace("Header of netconf message parsed \n{}", header);
- msgBytes = Charsets.UTF_8.encode(header + xmlToString(netconfMessage.getDocument()));
- } else {
- msgBytes = Charsets.UTF_8.encode(xmlToString(netconfMessage.getDocument()));
- }
- String content = xmlToString(netconfMessage.getDocument());
-
- logger.trace("Putting message \n{}", content);
- byte[] b = new byte[msgBytes.limit()];
- msgBytes.get(b);
- return b;
- }
-
- private String xmlToString(Document doc) {
- return XmlUtil.toString(doc, false);
- }
-}
*/
package org.opendaylight.controller.netconf.util.messages;
-import com.google.common.io.Files;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import io.netty.buffer.Unpooled;
import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
-public class NetconfMessageFactoryTest {
+import org.junit.Test;
+import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder;
+import com.google.common.io.Files;
+public class NetconfMessageFactoryTest {
@Test
public void testAuth() throws Exception {
- NetconfMessageFactory parser = new NetconfMessageFactory();
+ NetconfXMLToMessageDecoder parser = new NetconfXMLToMessageDecoder();
File authHelloFile = new File(getClass().getResource("/netconfMessages/client_hello_with_auth.xml").getFile());
- parser.parse(Files.toByteArray(authHelloFile));
+ final List<Object> out = new ArrayList<>();
+ parser.decode(null, Unpooled.wrappedBuffer(Files.toByteArray(authHelloFile)), out);
+ assertEquals(1, out.size());
}
}