Merge "Bug 809: Enhancements to the toaster example"
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / handler / NetconfXMLToHelloMessageDecoder.java
index 42586a5ecc60433d28a8fa89bbe57d3e958fdee1..361d4fcee908018eac90a54844569ced70c22a21 100644 (file)
@@ -7,25 +7,46 @@
  */
 package org.opendaylight.controller.netconf.util.handler;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
+import org.opendaylight.controller.netconf.api.NetconfDocumentedException;
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage;
 import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessageAdditionalHeader;
+import org.opendaylight.controller.netconf.util.xml.XmlUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
+import org.xml.sax.SAXException;
 
 /**
  * Customized NetconfXMLToMessageDecoder that reads additional header with
  * session metadata from
  * {@link org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage}
- * . Used by netconf server to retrieve information about session metadata.
+ *
+ *
+ * This handler should be replaced in pipeline by regular message handler as last step of negotiation.
+ * It serves as a message barrier and halts all non-hello netconf messages.
+ * Netconf messages after hello should be processed once the negotiation succeeded.
+ *
  */
-public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder {
+public final class NetconfXMLToHelloMessageDecoder extends ByteToMessageDecoder {
+    private static final Logger LOG = LoggerFactory.getLogger(NetconfXMLToHelloMessageDecoder.class);
 
     private static final List<byte[]> POSSIBLE_ENDS = ImmutableList.of(
             new byte[] { ']', '\n' },
@@ -35,34 +56,73 @@ public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder
             new byte[] { '\r', '\n', '[' },
             new byte[] { '\n', '[' });
 
-    private String additionalHeaderCache;
+    // State variables do not have to by synchronized
+    // Netty uses always the same (1) thread per pipeline
+    // We use instance of this per pipeline
+    private List<NetconfMessage> nonHelloMessages = Lists.newArrayList();
+    private boolean helloReceived = false;
 
     @Override
-    protected byte[] preprocessMessageBytes(byte[] bytes) {
-        // Extract bytes containing header with additional metadata
-
-        if (startsWithAdditionalHeader(bytes)) {
-            // Auth information containing username, ip address... extracted for monitoring
-            int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
-            if (endOfAuthHeader > -1) {
-                byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
-                additionalHeaderCache = additionalHeaderToString(additionalHeaderBytes);
-                bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
-            }
+    @VisibleForTesting
+    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws IOException, SAXException, NetconfDocumentedException {
+        if (in.readableBytes() == 0) {
+            LOG.debug("No more content in incoming buffer.");
+            return;
         }
 
-        return bytes;
-    }
+        in.markReaderIndex();
+        try {
+            LOG.trace("Received to decode: {}", ByteBufUtil.hexDump(in));
+            byte[] bytes = new byte[in.readableBytes()];
+            in.readBytes(bytes);
+
+            logMessage(bytes);
+
+            // Extract bytes containing header with additional metadata
+            String additionalHeader = null;
+            if (startsWithAdditionalHeader(bytes)) {
+                // Auth information containing username, ip address... extracted for monitoring
+                int endOfAuthHeader = getAdditionalHeaderEndIndex(bytes);
+                if (endOfAuthHeader > -1) {
+                    byte[] additionalHeaderBytes = Arrays.copyOfRange(bytes, 0, endOfAuthHeader + 2);
+                    additionalHeader = additionalHeaderToString(additionalHeaderBytes);
+                    bytes = Arrays.copyOfRange(bytes, endOfAuthHeader + 2, bytes.length);
+                }
+            }
 
-    @Override
-    protected void cleanUpAfterDecode() {
-        additionalHeaderCache = null;
+            Document doc = XmlUtil.readXmlToDocument(new ByteArrayInputStream(bytes));
+
+            final NetconfMessage message = getNetconfMessage(additionalHeader, doc);
+            if (message instanceof NetconfHelloMessage) {
+                Preconditions.checkState(helloReceived == false,
+                        "Multiple hello messages received, unexpected hello: %s",
+                        XmlUtil.toString(message.getDocument()));
+                out.add(message);
+                helloReceived = true;
+            // Non hello message, suspend the message and insert into cache
+            } else {
+                Preconditions.checkState(helloReceived, "Hello message not received, instead received: %s",
+                        XmlUtil.toString(message.getDocument()));
+                LOG.debug("Netconf message received during negotiation, caching {}",
+                        XmlUtil.toString(message.getDocument()));
+                nonHelloMessages.add(message);
+            }
+        } finally {
+            in.discardReadBytes();
+        }
     }
 
-    @Override
-    protected NetconfMessage buildNetconfMessage(Document doc) {
-        return new NetconfHelloMessage(doc, additionalHeaderCache == null ? null
-                : NetconfHelloMessageAdditionalHeader.fromString(additionalHeaderCache));
+    private NetconfMessage getNetconfMessage(final String additionalHeader, final Document doc) throws NetconfDocumentedException {
+        NetconfMessage msg = new NetconfMessage(doc);
+        if(NetconfHelloMessage.isHelloMessage(msg)) {
+            if (additionalHeader != null) {
+                return new NetconfHelloMessage(doc, NetconfHelloMessageAdditionalHeader.fromString(additionalHeader));
+            } else {
+                return new NetconfHelloMessage(doc);
+            }
+        }
+
+        return msg;
     }
 
     private int getAdditionalHeaderEndIndex(byte[] bytes) {
@@ -102,15 +162,23 @@ public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder
         return -1;
     }
 
+
+    private void logMessage(byte[] bytes) {
+        String s = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
+        LOG.debug("Parsing message \n{}", s);
+    }
+
     private boolean startsWithAdditionalHeader(byte[] bytes) {
         for (byte[] possibleStart : POSSIBLE_STARTS) {
             int i = 0;
             for (byte b : possibleStart) {
-                if(bytes[i++] != b)
+                if(bytes[i++] != b) {
                     break;
+                }
 
-                if(i == possibleStart.length)
+                if(i == possibleStart.length) {
                     return true;
+                }
             }
         }
 
@@ -121,4 +189,10 @@ public class NetconfXMLToHelloMessageDecoder extends NetconfXMLToMessageDecoder
         return Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)).toString();
     }
 
+    /**
+     * @return Collection of NetconfMessages that were not hello, but were received during negotiation
+     */
+    public Iterable<NetconfMessage> getPostHelloNetconfMessages() {
+        return nonHelloMessages;
+    }
 }