Bug 4432 - NPE problem in OFEncode
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index dc378164dc1685a4442424fed0d49409209d8f7c..ebf956f9d37cb3153aee21d0323061fad037bf6a 100644 (file)
@@ -10,9 +10,12 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nonnull;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
@@ -23,6 +26,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
+    private static enum PipelineState {
+        /**
+         * Netty thread is potentially idle, no assumptions
+         * can be made about its state.
+         */
+        IDLE,
+        /**
+         * Netty thread is currently reading, once the read completes,
+         * if will flush the queue in the {@link #FLUSHING} state.
+         */
+        READING,
+        /**
+         * Netty thread is currently performing a flush on the queue.
+         * It will then transition to {@link #IDLE} state.
+         */
+        WRITING,
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
 
     /**
@@ -46,7 +67,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private final T handler;
 
     // Accessed concurrently
-    private volatile boolean reading;
+    private volatile PipelineState state = PipelineState.IDLE;
 
     // Updated from netty only
     private boolean alreadyReading;
@@ -184,11 +205,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     }
 
     private void writeAndFlush() {
+        state = PipelineState.WRITING;
+
         final long start = System.nanoTime();
 
         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
         if (entries > 0) {
-            LOG.debug("Flushing channel {}", parent.getChannel());
+            LOG.trace("Flushing channel {}", parent.getChannel());
             parent.getChannel().flush();
         }
 
@@ -197,6 +220,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             LOG.debug("Flushed {} messages to channel {} in {}us", entries,
                 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
         }
+
+        state = PipelineState.IDLE;
     }
 
     /**
@@ -207,7 +232,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     protected void flush() {
         // If the channel is gone, just flush whatever is not completed
         if (!shuttingDown) {
-            LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
+            LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
             writeAndFlush();
             rescheduleFlush();
         } else if (currentQueue.finishShutdown()) {
@@ -241,6 +266,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush();
     }
 
+    @Override
     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
         /*
          * Tune channel write buffering. We increase the writability window
@@ -258,14 +284,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
 
-        // A simple trade-off. While we could write things right away, if there is a task
-        // schedule, let it have the work
-        if (flushScheduled.compareAndSet(false, true)) {
-            LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
-            flush();
-        } else {
-            LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
-        }
+        // The channel is writable again. There may be a flush task on the way, but let's
+        // steal its work, potentially decreasing latency. Since there is a window between
+        // now and when it will run, it may still pick up some more work to do.
+        LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+        writeAndFlush();
     }
 
     @Override
@@ -283,10 +306,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
     @Override
     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
-        // non-volatile read if we are called multiple times
+        // Netty does not provide a 'start reading' callback, so this is our first
+        // (and repeated) chance to detect reading. Since this callback can be invoked
+        // multiple times, we keep a boolean we check. That prevents a volatile write
+        // on repeated invocations. It will be cleared in channelReadComplete().
         if (!alreadyReading) {
             alreadyReading = true;
-            reading = true;
+            state = PipelineState.READING;
         }
         super.channelRead(ctx, msg);
     }
@@ -294,25 +320,15 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
         super.channelReadComplete(ctx);
-        alreadyReading = false;
-        reading = false;
-
-        // TODO: model this as an atomic gate. We need to sync on it to make sure
-        //       that ensureFlushing() suppresses scheudling only if this barrier
-        //       has not been crossed.
-        synchronized (this) {
-            // Run flush regardless of writability. This is not strictly required, as
-            // there may be a scheduled flush. Instead of canceling it, which is expensive,
-            // we'll steal its work. Note that more work may accumulate in the time window
-            // between now and when the task will run, so it may not be a no-op after all.
-            //
-            // The reason for this is to will the output buffer before we go into selection
-            // phase. This will make sure the pipe is full (in which case our next wake up
-            // will be the queue becoming writable).
-            writeAndFlush();
-        }
 
-        LOG.debug("Opportunistic write on channel {}", parent.getChannel());
+        // Run flush regardless of writability. This is not strictly required, as
+        // there may be a scheduled flush. Instead of canceling it, which is expensive,
+        // we'll steal its work. Note that more work may accumulate in the time window
+        // between now and when the task will run, so it may not be a no-op after all.
+        //
+        // The reason for this is to will the output buffer before we go into selection
+        // phase. This will make sure the pipe is full (in which case our next wake up
+        // will be the queue becoming writable).
         writeAndFlush();
     }
 
@@ -330,22 +346,25 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
 
         // We are currently reading something, just a quick sync to ensure we will in fact
         // flush state.
-        if (reading) {
-            synchronized (this) {
-                if (reading) {
-                    return;
-                }
-            }
+        final PipelineState localState = state;
+        LOG.debug("Synchronize on pipeline state {}", localState);
+        switch (localState) {
+        case READING:
+            // Netty thread is currently reading, it will flush the pipeline once it
+            // finishes reading. This is a no-op situation.
+            break;
+        case WRITING:
+        case IDLE:
+        default:
+            // We cannot rely on the change being flushed, schedule a request
+            scheduleFlush();
         }
-
-        // Netty thread is outside our code, we need to schedule a flush
-        // to re-synchronize.
-        scheduleFlush();
     }
 
     void onEchoRequest(final EchoRequestMessage message) {
-        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
-        parent.getChannel().writeAndFlush(reply);
+        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
+                .setVersion(message.getVersion()).setXid(message.getXid()).build();
+        parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
     }
 
     /**
@@ -359,12 +378,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      *            adding overhead.
      */
     void writeMessage(final OfHeader message, final long now) {
-        final Object wrapper;
-        if (address == null) {
-            wrapper = new MessageListenerWrapper(message, null);
-        } else {
-            wrapper = new UdpMessageListenerWrapper(message, null, address);
-        }
+        final Object wrapper = makeMessageListenerWrapper(message);
         parent.getChannel().write(wrapper);
 
         if (message instanceof BarrierInput) {
@@ -381,4 +395,33 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             }
         }
     }
+
+    /**
+     * Wraps outgoing message and includes listener attached to this message
+     * which is send to OFEncoder for serialization. Correct wrapper is
+     * selected by communication pipeline.
+     *
+     * @return
+     */
+    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+        Preconditions.checkArgument(msg != null);
+
+        if (address == null) {
+            return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
+        }
+        return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
+    }
+
+    /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
+    private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
+
+        private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
+
+        @Override
+        public void operationComplete(final Future<Void> future) throws Exception {
+            if (future.cause() != null) {
+                LOGGER.warn("Message encoding fail !", future.cause());
+            }
+        }
+    };
 }