X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2FAbstractNetconfSessionNegotiator.java;h=b0c8c6dc19e6b3b6c97f90b21fe0e1dc680e0ba3;hb=56d545c4a73d0d87c0d25ba20cfa4b398c87c511;hp=3ed75a1b1684af1085347b9e822479720a5da83e;hpb=e6bcd06e610be274e8f2df901b61789bb17c442a;p=controller.git diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index 3ed75a1b16..b0c8c6dc19 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -10,8 +10,10 @@ package org.opendaylight.controller.netconf.util; import com.google.common.base.Optional; import com.google.common.base.Preconditions; - import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -19,19 +21,19 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; - +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.api.NetconfSession; +import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; +import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; +import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; -import org.opendaylight.controller.netconf.util.xml.XmlElement; -import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.protocol.framework.AbstractSessionNegotiator; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -39,50 +41,55 @@ import org.w3c.dom.NodeList; import java.util.concurrent.TimeUnit; -public abstract class AbstractNetconfSessionNegotiator
- extends AbstractSessionNegotiator , L extends NetconfSessionListener absent() : Optional.of(sslHandler);
}
+ public P getSessionPreferences() {
+ return sessionPreferences;
+ }
+
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
- sendMessage(helloMessage);
- changeState(State.OPEN_WAIT);
+ channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
- this.timer.newTimeout(new TimerTask() {
+ timeout = this.timer.newTimeout(new TimerTask() {
@Override
- public void run(final Timeout timeout) throws Exception {
+ public void run(final Timeout timeout) {
synchronized (this) {
if (state != State.ESTABLISHED) {
+ logger.debug("Connection timeout after {}, session is in state {}", timeout, state);
final IllegalStateException cause = new IllegalStateException(
"Session was not established after " + timeout);
negotiationFailed(cause);
changeState(State.FAILED);
+ } else if(channel.isOpen()) {
+ channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
}
}
}
- }, INITIAL_HOLDTIMER, TimeUnit.MINUTES);
+ }, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
+
+ // FIXME, make sessionPreferences return HelloMessage, move NetconfHelloMessage to API
+ sendMessage((NetconfHelloMessage)helloMessage);
+
+ replaceHelloMessageOutboundHandler();
+ changeState(State.OPEN_WAIT);
}
- private void sendMessage(NetconfMessage message) {
- this.channel.writeAndFlush(message);
+ private void cancelTimeout() {
+ if(timeout!=null) {
+ timeout.cancel();
+ }
}
- @Override
- protected void handleMessage(NetconfMessage netconfMessage) {
+ protected final S getSessionForHelloMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException {
+ Preconditions.checkNotNull(netconfMessage, "netconfMessage");
+
final Document doc = netconfMessage.getDocument();
- if (isHelloMessage(doc)) {
- if (containsBase11Capability(doc)
- && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument())) {
- channel.pipeline().replace("frameEncoder", "frameEncoder",
- FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
- channel.pipeline().replace("aggregator", "aggregator",
- new NetconfMessageAggregator(FramingMechanism.CHUNK));
- channel.pipeline().addAfter("aggregator", "chunkDecoder", new NetconfMessageChunkDecoder());
- }
- changeState(State.ESTABLISHED);
- S session = getSession(sessionListener, channel, doc);
- negotiationSuccessful(session);
- } else {
- final IllegalStateException cause = new IllegalStateException(
- "Received message was not hello as expected, but was " + XmlUtil.toString(doc));
- negotiationFailed(cause);
+ if (shouldUseChunkFraming(doc)) {
+ insertChunkFramingToPipeline();
}
+
+ changeState(State.ESTABLISHED);
+ return getSession(sessionListener, channel, netconfMessage);
}
- protected abstract S getSession(SessionListener sessionListener, Channel channel, Document doc);
+ /**
+ * Insert chunk framing handlers into the pipeline
+ */
+ private void insertChunkFramingToPipeline() {
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
+ FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
+ new NetconfChunkAggregator());
+ }
- private boolean isHelloMessage(Document doc) {
- try {
- XmlElement.fromDomElementWithExpected(doc.getDocumentElement(), "hello",
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ private boolean shouldUseChunkFraming(Document doc) {
+ return containsBase11Capability(doc)
+ && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument());
+ }
- } catch (IllegalArgumentException | IllegalStateException e) {
- return false;
+ /**
+ * Remove special inbound handler for hello message. Insert regular netconf xml message (en|de)coders.
+ *
+ * Inbound hello message handler should be kept until negotiation is successful
+ * It caches any non-hello messages while negotiation is still in progress
+ */
+ protected final void replaceHelloMessageInboundHandler(final S session) {
+ ChannelHandler helloMessageHandler = replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
+
+ Preconditions.checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder,
+ "Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline());
+ Iterable>
+extends AbstractSessionNegotiator promise, Channel channel, Timer timer,
- SessionListener sessionListener) {
+ L sessionListener, long connectionTimeoutMillis) {
super(promise, channel);
this.sessionPreferences = sessionPreferences;
this.timer = timer;
this.sessionListener = sessionListener;
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
}
@Override
- protected void startNegotiation() throws Exception {
+ protected final void startNegotiation() {
final Optional