X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2FAbstractNetconfSessionNegotiator.java;h=724b45bc08d867f6702ae75a11aece291fefd484;hb=59a26f11e6f07d6b36aa820562ad00e367dcaa3b;hp=e30ce5b47e961f87adccebcce363f2092da9c165;hpb=576aa6018e48dfca8f223b7ac929139a32135201;p=controller.git diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index e30ce5b47e..724b45bc08 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -13,6 +13,7 @@ import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -21,17 +22,16 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; import org.opendaylight.controller.netconf.api.NetconfMessage; -import org.opendaylight.controller.netconf.api.NetconfSession; +import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageAggregator; -import org.opendaylight.controller.netconf.util.handler.NetconfMessageChunkDecoder; +import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; +import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; +import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; -import org.opendaylight.controller.netconf.util.xml.XmlElement; -import org.opendaylight.controller.netconf.util.xml.XmlNetconfConstants; +import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; import org.opendaylight.controller.netconf.util.xml.XmlUtil; import org.opendaylight.protocol.framework.AbstractSessionNegotiator; -import org.opendaylight.protocol.framework.SessionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -39,36 +39,37 @@ import org.w3c.dom.NodeList; import java.util.concurrent.TimeUnit; -public abstract class AbstractNetconfSessionNegotiator
- extends AbstractSessionNegotiator , L extends NetconfSessionListener future = sslHandler.get().handshakeFuture();
future.addListener(new GenericFutureListener absent() : Optional.of(sslHandler);
}
+ public P getSessionPreferences() {
+ return sessionPreferences;
+ }
+
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
- channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ChannelHandler() {
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- logger.warn("An exception occurred during negotiation on channel {}", channel.localAddress(), cause);
- cancelTimeout();
- negotiationFailed(cause);
- changeState(State.FAILED);
- }
- });
+ channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
timeout = this.timer.newTimeout(new TimerTask() {
@Override
- public void run(final Timeout timeout) throws Exception {
+ public void run(final Timeout timeout) {
synchronized (this) {
if (state != State.ESTABLISHED) {
+ logger.debug("Connection timeout after {}, session is in state {}", timeout, state);
final IllegalStateException cause = new IllegalStateException(
"Session was not established after " + timeout);
negotiationFailed(cause);
changeState(State.FAILED);
- } else
+ } else if(channel.isOpen()) {
channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
+ }
}
}
- }, INITIAL_HOLDTIMER, TimeUnit.MINUTES);
+ }, connectionTimeoutMillis, TimeUnit.MILLISECONDS);
- sendMessage(helloMessage);
+ // FIXME, make sessionPreferences return HelloMessage, move NetconfHelloMessage to API
+ sendMessage((NetconfHelloMessage)helloMessage);
changeState(State.OPEN_WAIT);
}
-
private void cancelTimeout() {
- if(timeout!=null)
+ if(timeout!=null) {
timeout.cancel();
+ }
}
- private void sendMessage(NetconfMessage message) {
- this.channel.writeAndFlush(message);
+ @Override
+ protected void handleMessage(NetconfHelloMessage netconfMessage) {
+ S session = getSessionForHelloMessage(netconfMessage);
+ negotiationSuccessful(session);
}
- @Override
- protected void handleMessage(NetconfMessage netconfMessage) {
+ protected final S getSessionForHelloMessage(NetconfHelloMessage netconfMessage) {
+ Preconditions.checkNotNull(netconfMessage, "netconfMessage");
+
final Document doc = netconfMessage.getDocument();
- if (isHelloMessage(doc)) {
- if (containsBase11Capability(doc)
- && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument())) {
- channel.pipeline().replace("frameEncoder", "frameEncoder",
- FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
- channel.pipeline().replace("aggregator", "aggregator",
- new NetconfMessageAggregator(FramingMechanism.CHUNK));
- channel.pipeline().addAfter("aggregator", "chunkDecoder", new NetconfMessageChunkDecoder());
- }
- changeState(State.ESTABLISHED);
- S session = getSession(sessionListener, channel, netconfMessage);
- negotiationSuccessful(session);
- } else {
- final IllegalStateException cause = new IllegalStateException(
- "Received message was not hello as expected, but was " + XmlUtil.toString(doc));
- logger.warn("Negotiation of netconf session failed", cause);
- negotiationFailed(cause);
+ replaceHelloMessageHandlers();
+
+ if (shouldUseChunkFraming(doc)) {
+ insertChunkFramingToPipeline();
}
+
+ changeState(State.ESTABLISHED);
+ return getSession(sessionListener, channel, netconfMessage);
}
- protected abstract S getSession(SessionListener sessionListener, Channel channel, NetconfMessage message);
+ /**
+ * Insert chunk framing handlers into the pipeline
+ */
+ protected void insertChunkFramingToPipeline() {
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER,
+ FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK));
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR,
+ new NetconfChunkAggregator());
+ }
- private boolean isHelloMessage(Document doc) {
- try {
- XmlElement.fromDomElementWithExpected(doc.getDocumentElement(), "hello",
- XmlNetconfConstants.URN_IETF_PARAMS_XML_NS_NETCONF_BASE_1_0);
+ protected boolean shouldUseChunkFraming(Document doc) {
+ return containsBase11Capability(doc)
+ && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument());
+ }
- } catch (IllegalArgumentException | IllegalStateException e) {
- return false;
- }
- return true;
+ /**
+ * Remove special handlers for hello message. Insert regular netconf xml message (en|de)coders.
+ */
+ protected void replaceHelloMessageHandlers() {
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder());
+ replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, new NetconfMessageToXMLEncoder());
+ }
+
+ private static ChannelHandler replaceChannelHandler(Channel channel, String handlerKey, ChannelHandler decoder) {
+ return channel.pipeline().replace(handlerKey, handlerKey, decoder);
}
- private void changeState(final State newState) {
+ protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message);
+
+ protected synchronized void changeState(final State newState) {
logger.debug("Changing state from : {} to : {}", state, newState);
Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
newState);
@@ -198,13 +200,29 @@ public abstract class AbstractNetconfSessionNegotiator>
+extends AbstractSessionNegotiator promise, Channel channel, Timer timer,
- SessionListener sessionListener) {
+ L sessionListener, long connectionTimeoutMillis) {
super(promise, channel);
this.sessionPreferences = sessionPreferences;
this.timer = timer;
this.sessionListener = sessionListener;
+ this.connectionTimeoutMillis = connectionTimeoutMillis;
}
@Override
@@ -78,14 +79,15 @@ public abstract class AbstractNetconfSessionNegotiator