Also set state only if promise is not yet finished.
Replace custom EOM aggregator with implementation provided by netty.
Change-Id: Iffb740fff1512ca14efe58ed5112f74ce5e75c97
Signed-off-by: Maros Marsalek <mmarsale@cisco.com>
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
private void start() {
final NetconfMessage helloMessage = this.sessionPreferences.getHelloMessage();
- logger.debug("Session negotiation started with hello message {}", XmlUtil.toString(helloMessage.getDocument()));
+ logger.debug("Session negotiation started with hello message {} on channel {}", XmlUtil.toString(helloMessage.getDocument()), channel);
channel.pipeline().addLast(NAME_OF_EXCEPTION_HANDLER, new ExceptionHandlingInboundChannelHandler());
// Do not fail negotiation if promise is done or canceled
// It would result in setting result of the promise second time and that throws exception
if (isPromiseFinished() == false) {
- // FIXME BUG-1365 calling "negotiation failed" closes the channel, but the channel does not get closed if data is still being transferred
- // Loopback connection initiation might
negotiationFailed(new IllegalStateException("Session was not established after " + timeout));
+ changeState(State.FAILED);
+
+ channel.closeFuture().addListener(new GenericFutureListener<ChannelFuture>() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if(future.isSuccess()) {
+ logger.debug("Channel {} closed: success", future.channel());
+ } else {
+ logger.warn("Channel {} closed: fail", future.channel());
+ }
+ }
+ });
}
-
- changeState(State.FAILED);
} else if(channel.isOpen()) {
channel.pipeline().remove(NAME_OF_EXCEPTION_HANDLER);
}
protected abstract S getSession(L sessionListener, Channel channel, NetconfHelloMessage message) throws NetconfDocumentedException;
private synchronized void changeState(final State newState) {
- logger.debug("Changing state from : {} to : {}", state, newState);
- Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s", state,
- newState);
+ logger.debug("Changing state from : {} to : {} for channel: {}", state, newState, channel);
+ Preconditions.checkState(isStateChangePermitted(state, newState), "Cannot change state from %s to %s for chanel %s", state,
+ newState, channel);
this.state = newState;
}
package org.opendaylight.controller.netconf.nettyutil.handler;
import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.ByteToMessageDecoder;
-
-import java.util.List;
-
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import org.opendaylight.controller.netconf.util.messages.NetconfMessageConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
+public class NetconfEOMAggregator extends DelimiterBasedFrameDecoder {
-public class NetconfEOMAggregator extends ByteToMessageDecoder {
- private final static Logger logger = LoggerFactory.getLogger(NetconfEOMAggregator.class);
+ public static final ByteBuf DELIMITER = Unpooled.wrappedBuffer(NetconfMessageConstants.END_OF_MESSAGE);
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
- int index = indexOfSequence(in, NetconfMessageConstants.END_OF_MESSAGE);
- if (index == -1) {
- logger.debug("Message is not complete, read again.");
- if (logger.isTraceEnabled()) {
- String str = in.toString(Charsets.UTF_8);
- logger.trace("Message read so far: {}", str);
- }
- ctx.read();
- } else {
- ByteBuf msg = in.readBytes(index);
- in.readBytes(NetconfMessageConstants.END_OF_MESSAGE.length);
- in.discardReadBytes();
- logger.debug("Message is complete.");
- out.add(msg);
- }
+ public NetconfEOMAggregator() {
+ super(Integer.MAX_VALUE, DELIMITER);
}
-
- private int indexOfSequence(ByteBuf in, byte[] sequence) {
- int index = -1;
- for (int i = 0; i < in.readableBytes() - sequence.length + 1; i++) {
- if (in.getByte(i) == sequence[0]) {
- index = i;
- for (int j = 1; j < sequence.length; j++) {
- if (in.getByte(i + j) != sequence[j]) {
- index = -1;
- break;
- }
- }
- if (index != -1) {
- return index;
- }
- }
- }
- return index;
- }
-
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
- StreamResult result = new StreamResult(new OutputStreamWriter(os));
+ // Wrap OutputStreamWriter with BufferedWriter as suggested in javadoc for OutputStreamWriter
+ StreamResult result = new StreamResult(new BufferedWriter(new OutputStreamWriter(os)));
DOMSource source = new DOMSource(msg.getDocument());
transformer.transform(source, result);
}