BUG-1365 Check if channel was closed when negotiation fails 29/9029/3
authorMaros Marsalek <mmarsale@cisco.com>
Tue, 15 Jul 2014 13:55:34 +0000 (15:55 +0200)
committerMaros Marsalek <mmarsale@cisco.com>
Thu, 17 Jul 2014 06:42:59 +0000 (08:42 +0200)
Also set state only if promise is not yet finished.
Replace custom EOM aggregator with implementation provided by netty.

Change-Id: Iffb740fff1512ca14efe58ed5112f74ce5e75c97
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/AbstractNetconfSessionNegotiator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfEOMAggregator.java
opendaylight/netconf/netconf-netty-util/src/main/java/org/opendaylight/controller/netconf/nettyutil/handler/NetconfMessageToXMLEncoder.java

index 1360a54..de3f732 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.netconf.nettyutil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -104,7 +105,7 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
 
     private void start() {
         final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
-        logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
+        logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
 
         channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
 
@@ -125,12 +126,20 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
                         // Do not fail negotiation if promise is done or canceled
                         // It would result in setting result of the promise second time and that throws exception
                         if (isPromiseFinished() == false) {
-                            // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred
-                            // Loopback connection initiation might
                             negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+                            changeState(State.FAILED);
+
+                            channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+                                @Override
+                                public void operationComplete(ChannelFuture future) throws Exception {
+                                    if(future.isSuccess()) {
+                                        logger.debug("Channel {} closed: success", future.channel());
+                                    } else {
+                                        logger.warn("Channel {} closed: fail", future.channel());
+                                    }
+                                }
+                            });
                         }
-
-                        changeState(State.FAILED);
                     } else if(channel.isOpen()) {
                         channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
                     }
@@ -214,9 +223,9 @@ extends AbstractSessionNegotiator<NetconfHelloMessage, S> {
     protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
 
     private synchronized void changeState(final State newState) {
-        logger.debug("Changing state from : {} to : {}", state, newState);
-        Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
-                newState);
+        logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+        Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
+                newState, channel);
         this.state = newState;
     }
 
index f260bcb..a87a08d 100644 (file)
@@ -9,56 +9,15 @@
 package org.opendaylight.controller.netconf.nettyutil.handler;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
 import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
+public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
 
-public class NetconfEOMAggregator extends ByteToMessageDecoder {
-    private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+    public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE);
 
-    @Override
-    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
-        int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
-        if (index == -1) {
-            logger.debug("Message is not complete, read again.");
-            if (logger.isTraceEnabled()) {
-                String str = in.toString(Charsets.UTF_8);
-                logger.trace("Message read so far: {}", str);
-            }
-            ctx.read();
-        } else {
-            ByteBuf msg = in.readBytes(index);
-            in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
-            in.discardReadBytes();
-            logger.debug("Message is complete.");
-            out.add(msg);
-        }
+    public NetconfEOMAggregator() {
+        super(Integer.MAX_VALUE, DELIMITER);
     }
-
-    private int indexOfSequence(ByteBuf in, byte[] sequence) {
-        int index = -1;
-        for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
-            if (in.getByte(i) == sequence[0]) {
-                index = i;
-                for (int j = 1; j < sequence.length; j++) {
-                    if (in.getByte(i + j) != sequence[j]) {
-                        index = -1;
-                        break;
-                    }
-                }
-                if (index != -1) {
-                    return index;
-                }
-            }
-        }
-        return index;
-    }
-
 }
index fae2000..d810a87 100644 (file)
@@ -12,6 +12,7 @@ import io.netty.buffer.ByteBufOutputStream;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -61,7 +62,8 @@ public class NetconfMessageToXMLEncoder extends MessageToByteEncoder<NetconfMess
             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
 
-            StreamResult result = new StreamResult(new OutputStreamWriter(os));
+            // Wrap OutputStreamWriter with BufferedWriter as suggested in javadoc for OutputStreamWriter
+            StreamResult result = new StreamResult(new BufferedWriter(new OutputStreamWriter(os)));
             DOMSource source = new DOMSource(msg.getDocument());
             transformer.transform(source, result);
         }