Bug 4432 - NPE problem in OFEncode 39/28039/2
authorVaclav Demcak <vdemcak@cisco.com>
Thu, 8 Oct 2015 02:51:23 +0000 (04:51 +0200)
committerMichal Polkorab <michal.polkorab@pantheon.sk>
Fri, 9 Oct 2015 13:16:17 +0000 (13:16 +0000)
* add check for listener in msg wrapper
* add default listener for logging in OutboundQueueManager

Change-Id: Ib652d60a210feaddf96840334af3b435e7d0a14f
Signed-off-by: Vaclav Demcak <vdemcak@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/OFEncoder.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java

index 075b99da2bb30bddaca8c1fc4707cdd18d44b1eb..4c54732bda6ed0b91274dc56858d921bae13620e 100644 (file)
@@ -12,7 +12,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 import io.netty.util.concurrent.Future;
-
 import org.opendaylight.openflowjava.protocol.impl.core.connection.MessageListenerWrapper;
 import org.opendaylight.openflowjava.protocol.impl.serialization.SerializationFactory;
 import org.opendaylight.openflowjava.statistics.CounterEventTypes;
@@ -28,31 +27,33 @@ import org.slf4j.LoggerFactory;
  */
 public class OFEncoder extends MessageToByteEncoder<MessageListenerWrapper> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(OFEncoder.class);
+    private static final Logger LOG = LoggerFactory.getLogger(OFEncoder.class);
     private SerializationFactory serializationFactory;
-    private StatisticsCounters statisticsCounters;
+    private final StatisticsCounters statisticsCounters;
 
     /** Constructor of class */
     public OFEncoder() {
         statisticsCounters = StatisticsCounters.getInstance();
-        LOGGER.trace("Creating OF13Encoder");
+        LOG.trace("Creating OF13Encoder");
     }
 
     @Override
-    protected void encode(ChannelHandlerContext ctx, MessageListenerWrapper wrapper, ByteBuf out)
+    protected void encode(final ChannelHandlerContext ctx, final MessageListenerWrapper wrapper, final ByteBuf out)
             throws Exception {
-        LOGGER.trace("Encoding");
+        LOG.trace("Encoding");
         try {
             serializationFactory.messageToBuffer(wrapper.getMsg().getVersion(), out, wrapper.getMsg());
             if(wrapper.getMsg() instanceof FlowModInput){
                 statisticsCounters.incrementCounter(CounterEventTypes.DS_FLOW_MODS_SENT);
             }
             statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_SUCCESS);
-        } catch(Exception e) {
-            LOGGER.warn("Message serialization failed ", e);
+        } catch(final Exception e) {
+            LOG.warn("Message serialization failed ", e);
             statisticsCounters.incrementCounter(CounterEventTypes.DS_ENCODE_FAIL);
-            Future<Void> newFailedFuture = ctx.newFailedFuture(e);
-            wrapper.getListener().operationComplete(newFailedFuture);
+            if (wrapper.getListener() != null) {
+                final Future<Void> newFailedFuture = ctx.newFailedFuture(e);
+                wrapper.getListener().operationComplete(newFailedFuture);
+            }
             out.clear();
             return;
         }
@@ -61,7 +62,7 @@ public class OFEncoder extends MessageToByteEncoder<MessageListenerWrapper> {
     /**
      * @param serializationFactory
      */
-    public void setSerializationFactory(SerializationFactory serializationFactory) {
+    public void setSerializationFactory(final SerializationFactory serializationFactory) {
         this.serializationFactory = serializationFactory;
     }
 
index 8e1061b2927b1d3623f8eb38527505c7f7effc27..ebf956f9d37cb3153aee21d0323061fad037bf6a 100644 (file)
@@ -8,7 +8,6 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
-import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.Future;
@@ -16,6 +15,7 @@ import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nonnull;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
@@ -266,6 +266,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush();
     }
 
+    @Override
     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
         /*
          * Tune channel write buffering. We increase the writability window
@@ -363,19 +364,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     void onEchoRequest(final EchoRequestMessage message) {
         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
-        final SimpleRpcListener simpleMsgRpcListener = new SimpleRpcListener(reply, "echo-reply sending failed");
-
-        final GenericFutureListener<Future<Void>> msgProcessListener = simpleMsgRpcListener.takeListener();
-        final Object messageListenerWrapper;
-        if (address == null) {
-            messageListenerWrapper = new MessageListenerWrapper(simpleMsgRpcListener.takeMessage(), msgProcessListener);
-        } else {
-            messageListenerWrapper = new UdpMessageListenerWrapper(simpleMsgRpcListener.takeMessage(), msgProcessListener, address);
-        }
-        final ChannelFuture channelFuture = parent.getChannel().writeAndFlush(messageListenerWrapper);
-        if (msgProcessListener != null) {
-            channelFuture.addListener(msgProcessListener);
-        }
+        parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
     }
 
     /**
@@ -389,12 +378,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      *            adding overhead.
      */
     void writeMessage(final OfHeader message, final long now) {
-        final Object wrapper;
-        if (address == null) {
-            wrapper = new MessageListenerWrapper(message, null);
-        } else {
-            wrapper = new UdpMessageListenerWrapper(message, null, address);
-        }
+        final Object wrapper = makeMessageListenerWrapper(message);
         parent.getChannel().write(wrapper);
 
         if (message instanceof BarrierInput) {
@@ -411,4 +395,33 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             }
         }
     }
+
+    /**
+     * Wraps outgoing message and includes listener attached to this message
+     * which is send to OFEncoder for serialization. Correct wrapper is
+     * selected by communication pipeline.
+     *
+     * @return
+     */
+    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+        Preconditions.checkArgument(msg != null);
+
+        if (address == null) {
+            return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
+        }
+        return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
+    }
+
+    /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
+    private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
+
+        private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
+
+        @Override
+        public void operationComplete(final Future<Void> future) throws Exception {
+            if (future.cause() != null) {
+                LOGGER.warn("Message encoding fail !", future.cause());
+            }
+        }
+    };
 }