Reworked Netconf framing mechanism, added chunked framing mechanism.
[controller.git] / opendaylight / netconf / netconf-util / src / main / java / org / opendaylight / controller / netconf / util / AbstractNetconfSessionNegotiator.java
index 9069d85d88237388e0e041988626292fee0c34f0..3ed75a1b1684af1085347b9e822479720a5da83e 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.netconf.util;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+
 import io.netty.channel.Channel;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.Timeout;
@@ -18,9 +19,14 @@ import io.netty.util.TimerTask;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import io.netty.util.concurrent.Promise;
+
 import org.opendaylight.controller.netconf.api.NetconfMessage;
 import org.opendaylight.controller.netconf.api.NetconfSession;
 import org.opendaylight.controller.netconf.api.NetconfSessionPreferences;
+import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator;
+import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder;
+import org.opendaylight.controller.netconf.util.messages.FramingMechanism;
 import org.opendaylight.controller.netconf.util.xml.XmlElement;
 import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants;
 import org.opendaylight.controller.netconf.util.xml.XmlUtil;
@@ -29,6 +35,7 @@ import org.opendaylight.protocol.framework.SessionListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
+import org.w3c.dom.NodeList;
 
 import java.util.concurrent.TimeUnit;
 
@@ -114,6 +121,14 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
         final Document doc = netconfMessage.getDocument();
 
         if (isHelloMessage(doc)) {
+            if (containsBase11Capability(doc)
+                    && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument())) {
+                channel.pipeline().replace("frameEncoder", "frameEncoder",
+                        FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
+                channel.pipeline().replace("aggregator", "aggregator",
+                        new NetconfMessageAggregator(FramingMechanism.CHUNK));
+                channel.pipeline().addAfter("aggregator", "chunkDecoder", new NetconfMessageChunkDecoder());
+            }
             changeState(State.ESTABLISHED);
             S session = getSession(sessionListener, channel, doc);
             negotiationSuccessful(session);
@@ -144,6 +159,16 @@ public abstract class AbstractNetconfSessionNegotiator<P extends NetconfSessionP
         this.state = newState;
     }
 
+    private boolean containsBase11Capability(final Document doc) {
+        final NodeList nList = doc.getElementsByTagName("capability");
+        for (int i = 0; i < nList.getLength(); i++) {
+            if (nList.item(i).getTextContent().contains("base:1.1")) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     private static boolean isStateChangePermitted(State state, State newState) {
         if (state == State.IDLE && newState == State.OPEN_WAIT)
             return true;