From 22d96bf79e25f975b81cd8e95f1ec61378118976 Mon Sep 17 00:00:00 2001 From: Vaclav Demcak Date: Thu, 8 Oct 2015 04:51:23 +0200 Subject: [PATCH] Bug 4432 - NPE problem in OFEncode * add check for listener in msg wrapper * add default listener for logging in OutboundQueueManager Change-Id: Ib652d60a210feaddf96840334af3b435e7d0a14f Signed-off-by: Vaclav Demcak --- .../protocol/impl/core/OFEncoder.java | 23 ++++---- .../core/connection/OutboundQueueManager.java | 53 ++++++++++++------- 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OFEncoder.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OFEncoder.java index 075b99da..4c54732b 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OFEncoder.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OFEncoder.java @@ -12,7 +12,6 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.concurrent.Future; - import org.opendaylight.openflowjava.protocol.impl.core.connection.MessageListenerWrapper; import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory; import org.opendaylight.openflowjava.statistics.CounterEventTypes; @@ -28,31 +27,33 @@ import org.slf4j.LoggerFactory; */ public class OFEncoder extends MessageToByteEncoder { - private static final Logger LOGGER = LoggerFactory.getLogger(OFEncoder.class); + private static final Logger LOG = LoggerFactory.getLogger(OFEncoder.class); private SerializationFactory serializationFactory; - private StatisticsCounters statisticsCounters; + private final StatisticsCounters statisticsCounters; /** Constructor of class */ public OFEncoder() { statisticsCounters = StatisticsCounters.getInstance(); - LOGGER.trace("Creating OF13Encoder"); + LOG.trace("Creating OF13Encoder"); } @Override - protected void encode(ChannelHandlerContext ctx, MessageListenerWrapper wrapper, ByteBuf out) + protected void encode(final ChannelHandlerContext ctx, final MessageListenerWrapper wrapper, final ByteBuf out) throws Exception { - LOGGER.trace("Encoding"); + LOG.trace("Encoding"); try { serializationFactory.messageToBuffer(wrapper.getMsg().getVersion(), out, wrapper.getMsg()); if(wrapper.getMsg() instanceof FlowModInput){ statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT); } statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS); - } catch(Exception e) { - LOGGER.warn("Message serialization failed ", e); + } catch(final Exception e) { + LOG.warn("Message serialization failed ", e); statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL); - Future newFailedFuture = ctx.newFailedFuture(e); - wrapper.getListener().operationComplete(newFailedFuture); + if (wrapper.getListener() != null) { + final Future newFailedFuture = ctx.newFailedFuture(e); + wrapper.getListener().operationComplete(newFailedFuture); + } out.clear(); return; } @@ -61,7 +62,7 @@ public class OFEncoder extends MessageToByteEncoder { /** * @param serializationFactory */ - public void setSerializationFactory(SerializationFactory serializationFactory) { + public void setSerializationFactory(final SerializationFactory serializationFactory) { this.serializationFactory = serializationFactory; } diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index 8e1061b2..ebf956f9 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -8,7 +8,6 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.concurrent.Future; @@ -16,6 +15,7 @@ import io.netty.util.concurrent.GenericFutureListener; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nonnull; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; @@ -266,6 +266,7 @@ final class OutboundQueueManager extends Channel conditionalFlush(); } + @Override public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { /* * Tune channel write buffering. We increase the writability window @@ -363,19 +364,7 @@ final class OutboundQueueManager extends Channel void onEchoRequest(final EchoRequestMessage message) { final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()) .setVersion(message.getVersion()).setXid(message.getXid()).build(); - final SimpleRpcListener simpleMsgRpcListener = new SimpleRpcListener(reply, "echo-reply sending failed"); - - final GenericFutureListener> msgProcessListener = simpleMsgRpcListener.takeListener(); - final Object messageListenerWrapper; - if (address == null) { - messageListenerWrapper = new MessageListenerWrapper(simpleMsgRpcListener.takeMessage(), msgProcessListener); - } else { - messageListenerWrapper = new UdpMessageListenerWrapper(simpleMsgRpcListener.takeMessage(), msgProcessListener, address); - } - final ChannelFuture channelFuture = parent.getChannel().writeAndFlush(messageListenerWrapper); - if (msgProcessListener != null) { - channelFuture.addListener(msgProcessListener); - } + parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply)); } /** @@ -389,12 +378,7 @@ final class OutboundQueueManager extends Channel * adding overhead. */ void writeMessage(final OfHeader message, final long now) { - final Object wrapper; - if (address == null) { - wrapper = new MessageListenerWrapper(message, null); - } else { - wrapper = new UdpMessageListenerWrapper(message, null, address); - } + final Object wrapper = makeMessageListenerWrapper(message); parent.getChannel().write(wrapper); if (message instanceof BarrierInput) { @@ -411,4 +395,33 @@ final class OutboundQueueManager extends Channel } } } + + /** + * Wraps outgoing message and includes listener attached to this message + * which is send to OFEncoder for serialization. Correct wrapper is + * selected by communication pipeline. + * + * @return + */ + private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) { + Preconditions.checkArgument(msg != null); + + if (address == null) { + return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER); + } + return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address); + } + + /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */ + private static final GenericFutureListener> LOG_ENCODER_LISTENER = new GenericFutureListener>() { + + private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener"); + + @Override + public void operationComplete(final Future future) throws Exception { + if (future.cause() != null) { + LOGGER.warn("Message encoding fail !", future.cause()); + } + } + }; } -- 2.36.6