X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2Fhandler%2FNetconfXMLToHelloMessageDecoder.java;h=361d4fcee908018eac90a54844569ced70c22a21;hp=42586a5ecc60433d28a8fa89bbe57d3e958fdee1;hb=e159106bc148e76fc1e3e3c780bdd740d99e74ed;hpb=aeabf761ca043e41eeca6333bc9deb94b1de9ed0 diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfXMLToHelloMessageDecoder.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfXMLToHelloMessageDecoder.java index 42586a5ecc..361d4fcee9 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfXMLToHelloMessageDecoder.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/handler/NetconfXMLToHelloMessageDecoder.java @@ -7,25 +7,46 @@ */ package org.opendaylight.controller.netconf.util.handler; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.io.ByteArrayInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader; +import org.opendaylight.controller.netconf.util.xml.XmlUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.w3c.dom.Document; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import org.xml.sax.SAXException; /** * Customized NetconfXMLToMessageDecoder that reads additional header with * session metadata from * {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage} - * . Used by netconf server to retrieve information about session metadata. + * + * + * This handler should be replaced in pipeline by regular message handler as last step of negotiation. + * It serves as a message barrier and halts all non-hello netconf messages. + * Netconf messages after hello should be processed once the negotiation succeeded. + * */ -public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder { +public final class NetconfXMLToHelloMessageDecoder extends ByteToMessageDecoder { + private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToHelloMessageDecoder.class); private static final List POSSIBLE_ENDS = ImmutableList.of( new byte[] { ']', '\n' }, @@ -35,34 +56,73 @@ public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder new byte[] { '\r', '\n', '[' }, new byte[] { '\n', '[' }); - private String additionalHeaderCache; + // State variables do not have to by synchronized + // Netty uses always the same (1) thread per pipeline + // We use instance of this per pipeline + private List nonHelloMessages = Lists.newArrayList(); + private boolean helloReceived = false; @Override - protected byte[] preprocessMessageBytes(byte[] bytes) { - // Extract bytes containing header with additional metadata - - if (startsWithAdditionalHeader(bytes)) { - // Auth information containing username, ip address... extracted for monitoring - int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes); - if (endOfAuthHeader > -1) { - byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2); - additionalHeaderCache = additionalHeaderToString(additionalHeaderBytes); - bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length); - } + @VisibleForTesting + public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws IOException, SAXException, NetconfDocumentedException { + if (in.readableBytes() == 0) { + LOG.debug("No more content in incoming buffer."); + return; } - return bytes; - } + in.markReaderIndex(); + try { + LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in)); + byte[] bytes = new byte[in.readableBytes()]; + in.readBytes(bytes); + + logMessage(bytes); + + // Extract bytes containing header with additional metadata + String additionalHeader = null; + if (startsWithAdditionalHeader(bytes)) { + // Auth information containing username, ip address... extracted for monitoring + int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes); + if (endOfAuthHeader > -1) { + byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2); + additionalHeader = additionalHeaderToString(additionalHeaderBytes); + bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length); + } + } - @Override - protected void cleanUpAfterDecode() { - additionalHeaderCache = null; + Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes)); + + final NetconfMessage message = getNetconfMessage(additionalHeader, doc); + if (message instanceof NetconfHelloMessage) { + Preconditions.checkState(helloReceived == false, + "Multiple hello messages received, unexpected hello: %s", + XmlUtil.toString(message.getDocument())); + out.add(message); + helloReceived = true; + // Non hello message, suspend the message and insert into cache + } else { + Preconditions.checkState(helloReceived, "Hello message not received, instead received: %s", + XmlUtil.toString(message.getDocument())); + LOG.debug("Netconf message received during negotiation, caching {}", + XmlUtil.toString(message.getDocument())); + nonHelloMessages.add(message); + } + } finally { + in.discardReadBytes(); + } } - @Override - protected NetconfMessage buildNetconfMessage(Document doc) { - return new NetconfHelloMessage(doc, additionalHeaderCache == null ? null - : NetconfHelloMessageAdditionalHeader.fromString(additionalHeaderCache)); + private NetconfMessage getNetconfMessage(final String additionalHeader, final Document doc) throws NetconfDocumentedException { + NetconfMessage msg = new NetconfMessage(doc); + if(NetconfHelloMessage.isHelloMessage(msg)) { + if (additionalHeader != null) { + return new NetconfHelloMessage(doc, NetconfHelloMessageAdditionalHeader.fromString(additionalHeader)); + } else { + return new NetconfHelloMessage(doc); + } + } + + return msg; } private int getAdditionalHeaderEndIndex(byte[] bytes) { @@ -102,15 +162,23 @@ public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder return -1; } + + private void logMessage(byte[] bytes) { + String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString(); + LOG.debug("Parsing message \n{}", s); + } + private boolean startsWithAdditionalHeader(byte[] bytes) { for (byte[] possibleStart : POSSIBLE_STARTS) { int i = 0; for (byte b : possibleStart) { - if(bytes[i++] != b) + if(bytes[i++] != b) { break; + } - if(i == possibleStart.length) + if(i == possibleStart.length) { return true; + } } } @@ -121,4 +189,10 @@ public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString(); } + /** + * @return Collection of NetconfMessages that were not hello, but were received during negotiation + */ + public Iterable getPostHelloNetconfMessages() { + return nonHelloMessages; + } }