X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2FAbstractNetconfSessionNegotiator.java;h=b0c8c6dc19e6b3b6c97f90b21fe0e1dc680e0ba3;hb=2f034183cfcbaacf2a287f1ddf00d367674868af;hp=71f08339c803774d3d3b50a701b2d55454e9081e;hpb=2e92a15f60261432cde594d86f033802f7410e17;p=controller.git diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index 71f08339c8..b0c8c6dc19 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -8,9 +8,12 @@ package org.opendaylight.controller.netconf.util; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.ssl.SslHandler; import io.netty.util.Timeout; import io.netty.util.Timer; @@ -18,17 +21,14 @@ import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.Promise; - -import java.util.concurrent.TimeUnit; - -import io.netty.channel.ChannelInboundHandlerAdapter; -import org.opendaylight.controller.netconf.api.AbstractNetconfSession; +import org.opendaylight.controller.netconf.api.NetconfDocumentedException; import org.opendaylight.controller.netconf.api.NetconfMessage; import org.opendaylight.controller.netconf.api.NetconfSessionListener; import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; +import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; @@ -39,16 +39,16 @@ import org.slf4j.LoggerFactory; import org.w3c.dom.Document; import org.w3c.dom.NodeList; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; +import java.util.concurrent.TimeUnit; public abstract class AbstractNetconfSessionNegotiator

, L extends NetconfSessionListener> extends AbstractSessionNegotiator { private static final Logger logger = LoggerFactory.getLogger(AbstractNetconfSessionNegotiator.class); + public static final String NAME_OF_EXCEPTION_HANDLER = "lastExceptionHandler"; - private final P sessionPreferences; + protected final P sessionPreferences; private final L sessionListener; private Timeout timeout; @@ -56,7 +56,7 @@ extends AbstractSessionNegotiator { /** * Possible states for Finite State Machine */ - private enum State { + protected enum State { IDLE, OPEN_WAIT, FAILED, ESTABLISHED } @@ -64,6 +64,7 @@ extends AbstractSessionNegotiator { private final Timer timer; private final long connectionTimeoutMillis; + // TODO shrink constructor protected AbstractNetconfSessionNegotiator(P sessionPreferences, Promise promise, Channel channel, Timer timer, L sessionListener, long connectionTimeoutMillis) { super(promise, channel); @@ -74,7 +75,7 @@ extends AbstractSessionNegotiator { } @Override - protected void startNegotiation() { + protected final void startNegotiation() { final Optional sslHandler = getSslHandler(channel); if (sslHandler.isPresent()) { Future future = sslHandler.get().handshakeFuture(); @@ -125,6 +126,8 @@ extends AbstractSessionNegotiator { // FIXME, make sessionPreferences return HelloMessage, move NetconfHelloMessage to API sendMessage((NetconfHelloMessage)helloMessage); + + replaceHelloMessageOutboundHandler(); changeState(State.OPEN_WAIT); } @@ -134,22 +137,17 @@ extends AbstractSessionNegotiator { } } - @Override - protected void handleMessage(NetconfHelloMessage netconfMessage) { - Preconditions.checkNotNull(netconfMessage != null, "netconfMessage"); + protected final S getSessionForHelloMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException { + Preconditions.checkNotNull(netconfMessage, "netconfMessage"); final Document doc = netconfMessage.getDocument(); - replaceHelloMessageHandlers(); - if (shouldUseChunkFraming(doc)) { insertChunkFramingToPipeline(); } changeState(State.ESTABLISHED); - S session = getSession(sessionListener, channel, netconfMessage); - - negotiationSuccessful(session); + return getSession(sessionListener, channel, netconfMessage); } /** @@ -168,10 +166,31 @@ extends AbstractSessionNegotiator { } /** - * Remove special handlers for hello message. Insert regular netconf xml message (en|de)coders. + * Remove special inbound handler for hello message. Insert regular netconf xml message (en|de)coders. + * + * Inbound hello message handler should be kept until negotiation is successful + * It caches any non-hello messages while negotiation is still in progress */ - private void replaceHelloMessageHandlers() { - replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder()); + protected final void replaceHelloMessageInboundHandler(final S session) { + ChannelHandler helloMessageHandler = replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder()); + + Preconditions.checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder, + "Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline()); + Iterable netconfMessagesFromNegotiation = + ((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages(); + + // Process messages received during negotiation + // The hello message handler does not have to be synchronized, since it is always call from the same thread by netty + // It means, we are now using the thread now + for (NetconfMessage message : netconfMessagesFromNegotiation) { + session.handleMessage(message); + } + } + + /** + * Remove special outbound handler for hello message. Insert regular netconf xml message (en|de)coders. + */ + private void replaceHelloMessageOutboundHandler() { replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, new NetconfMessageToXMLEncoder()); } @@ -179,7 +198,7 @@ extends AbstractSessionNegotiator { return channel.pipeline().replace(handlerKey, handlerKey, decoder); } - protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message); + protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException; private synchronized void changeState(final State newState) { logger.debug("Changing state from : {} to : {}", state, newState); @@ -208,7 +227,6 @@ extends AbstractSessionNegotiator { if (state == State.OPEN_WAIT && newState == State.FAILED) { return true; } - logger.debug("Transition from {} to {} is not allowed", state, newState); return false; } @@ -217,7 +235,6 @@ extends AbstractSessionNegotiator { * Handler to catch exceptions in pipeline during negotiation */ private final class ExceptionHandlingInboundChannelHandler extends ChannelInboundHandlerAdapter { - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.warn("An exception occurred during negotiation on channel {}", channel.localAddress(), cause);