X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2FAbstractNetconfSessionNegotiator.java;fp=opendaylight%2Fnetconf%2Fnetconf-util%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fnetconf%2Futil%2FAbstractNetconfSessionNegotiator.java;h=b0c8c6dc19e6b3b6c97f90b21fe0e1dc680e0ba3;hb=79df3ca27a571091af4d8cf766ef81864aa70c2f;hp=5521e28818b20fcb9291089ced8867cce787e2e6;hpb=61bad4207cc91cd14d8d38a255ad6549c20ff54e;p=controller.git diff --git a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java index 5521e28818..b0c8c6dc19 100644 --- a/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-util/src/main/java/org/opendaylight/controller/netconf/util/AbstractNetconfSessionNegotiator.java @@ -28,6 +28,7 @@ import org.opendaylight.controller.netconf.api.NetconfSessionPreferences; import org.opendaylight.controller.netconf.util.handler.FramingMechanismHandlerFactory; import org.opendaylight.controller.netconf.util.handler.NetconfChunkAggregator; import org.opendaylight.controller.netconf.util.handler.NetconfMessageToXMLEncoder; +import org.opendaylight.controller.netconf.util.handler.NetconfXMLToHelloMessageDecoder; import org.opendaylight.controller.netconf.util.handler.NetconfXMLToMessageDecoder; import org.opendaylight.controller.netconf.util.messages.FramingMechanism; import org.opendaylight.controller.netconf.util.messages.NetconfHelloMessage; @@ -74,7 +75,7 @@ extends AbstractSessionNegotiator { } @Override - protected void startNegotiation() { + protected final void startNegotiation() { final Optional sslHandler = getSslHandler(channel); if (sslHandler.isPresent()) { Future future = sslHandler.get().handshakeFuture(); @@ -125,27 +126,22 @@ extends AbstractSessionNegotiator { // FIXME, make sessionPreferences return HelloMessage, move NetconfHelloMessage to API sendMessage((NetconfHelloMessage)helloMessage); + + replaceHelloMessageOutboundHandler(); changeState(State.OPEN_WAIT); } + private void cancelTimeout() { if(timeout!=null) { timeout.cancel(); } } - @Override - protected void handleMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException { - S session = getSessionForHelloMessage(netconfMessage) ; - negotiationSuccessful(session); - } - protected final S getSessionForHelloMessage(NetconfHelloMessage netconfMessage) throws NetconfDocumentedException { Preconditions.checkNotNull(netconfMessage, "netconfMessage"); final Document doc = netconfMessage.getDocument(); - replaceHelloMessageHandlers(); - if (shouldUseChunkFraming(doc)) { insertChunkFramingToPipeline(); } @@ -157,23 +153,44 @@ extends AbstractSessionNegotiator { /** * Insert chunk framing handlers into the pipeline */ - protected void insertChunkFramingToPipeline() { + private void insertChunkFramingToPipeline() { replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_FRAME_ENCODER, FramingMechanismHandlerFactory.createHandler(FramingMechanism.CHUNK)); replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_AGGREGATOR, new NetconfChunkAggregator()); } - protected boolean shouldUseChunkFraming(Document doc) { + private boolean shouldUseChunkFraming(Document doc) { return containsBase11Capability(doc) && containsBase11Capability(sessionPreferences.getHelloMessage().getDocument()); } /** - * Remove special handlers for hello message. Insert regular netconf xml message (en|de)coders. + * Remove special inbound handler for hello message. Insert regular netconf xml message (en|de)coders. + * + * Inbound hello message handler should be kept until negotiation is successful + * It caches any non-hello messages while negotiation is still in progress + */ + protected final void replaceHelloMessageInboundHandler(final S session) { + ChannelHandler helloMessageHandler = replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder()); + + Preconditions.checkState(helloMessageHandler instanceof NetconfXMLToHelloMessageDecoder, + "Pipeline handlers misplaced on session: %s, pipeline: %s", session, channel.pipeline()); + Iterable netconfMessagesFromNegotiation = + ((NetconfXMLToHelloMessageDecoder) helloMessageHandler).getPostHelloNetconfMessages(); + + // Process messages received during negotiation + // The hello message handler does not have to be synchronized, since it is always call from the same thread by netty + // It means, we are now using the thread now + for (NetconfMessage message : netconfMessagesFromNegotiation) { + session.handleMessage(message); + } + } + + /** + * Remove special outbound handler for hello message. Insert regular netconf xml message (en|de)coders. */ - protected void replaceHelloMessageHandlers() { - replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_DECODER, new NetconfXMLToMessageDecoder()); + private void replaceHelloMessageOutboundHandler() { replaceChannelHandler(channel, AbstractChannelInitializer.NETCONF_MESSAGE_ENCODER, new NetconfMessageToXMLEncoder()); } @@ -183,7 +200,7 @@ extends AbstractSessionNegotiator { protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException; - protected synchronized void changeState(final State newState) { + private synchronized void changeState(final State newState) { logger.debug("Changing state from : {} to : {}", state, newState); Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state, newState);