From fb9855b61dff67ca02ee860c8fd5ec753e11ddab Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Tue, 15 Jul 2014 15:55:34 +0200 Subject: [PATCH] BUG-1365 Check if channel was closed when negotiation fails Also set state only if promise is not yet finished. Replace custom EOM aggregator with implementation provided by netty. Change-Id: Iffb740fff1512ca14efe58ed5112f74ce5e75c97 Signed-off-by: Maros Marsalek --- .../AbstractNetconfSessionNegotiator.java | 25 ++++++--- .../handler/NetconfEOMAggregator.java | 53 +++---------------- .../handler/NetconfMessageToXMLEncoder.java | 4 +- 3 files changed, 26 insertions(+), 56 deletions(-) diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java index 1360a54d6f..de3f732b25 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -104,7 +105,7 @@ extends AbstractSessionNegotiator { private void start() { final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage(); - logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument())); + logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel); channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler()); @@ -125,12 +126,20 @@ extends AbstractSessionNegotiator { // Do not fail negotiation if promise is done or canceled // It would result in setting result of the promise second time and that throws exception if (isPromiseFinished() == false) { - // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred - // Loopback connection initiation might negotiationFailed(new IllegalStateException("Session was not established after " + timeout)); + changeState(State.FAILED); + + channel.closeFuture().addListener(new GenericFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if(future.isSuccess()) { + logger.debug("Channel {} closed: success", future.channel()); + } else { + logger.warn("Channel {} closed: fail", future.channel()); + } + } + }); } - - changeState(State.FAILED); } else if(channel.isOpen()) { channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER); } @@ -214,9 +223,9 @@ extends AbstractSessionNegotiator { protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException; private synchronized void changeState(final State newState) { - logger.debug("Changing state from : {} to : {}", state, newState); - Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state, - newState); + logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel); + Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state, + newState, channel); this.state = newState; } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java index f260bcbcef..a87a08ded7 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java @@ -9,56 +9,15 @@ package org.opendaylight.controller.netconf.nettyutil.handler; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.ByteToMessageDecoder; - -import java.util.List; - +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; +public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder { -public class NetconfEOMAggregator extends ByteToMessageDecoder { - private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class); + public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE); - @Override - protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) { - int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE); - if (index == -1) { - logger.debug("Message is not complete, read again."); - if (logger.isTraceEnabled()) { - String str = in.toString(Charsets.UTF_8); - logger.trace("Message read so far: {}", str); - } - ctx.read(); - } else { - ByteBuf msg = in.readBytes(index); - in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length); - in.discardReadBytes(); - logger.debug("Message is complete."); - out.add(msg); - } + public NetconfEOMAggregator() { + super(Integer.MAX_VALUE, DELIMITER); } - - private int indexOfSequence(ByteBuf in, byte[] sequence) { - int index = -1; - for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) { - if (in.getByte(i) == sequence[0]) { - index = i; - for (int j = 1; j < sequence.length; j++) { - if (in.getByte(i + j) != sequence[j]) { - index = -1; - break; - } - } - if (index != -1) { - return index; - } - } - } - return index; - } - } diff --git a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java index fae2000bb5..d810a870ff 100644 --- a/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java +++ b/opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java @@ -12,6 +12,7 @@ import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -61,7 +62,8 @@ public class NetconfMessageToXMLEncoder extends MessageToByteEncoder