Bug 4432 - NPE problem in OFEncode
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index 4864b95d655d8e63a2dc9851a364f060eadee1d5..ebf956f9d37cb3153aee21d0323061fad037bf6a 100644 (file)
@@ -10,58 +10,71 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection;
 import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nonnull;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
-    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
+    private static enum PipelineState {
+        /**
+         * Netty thread is potentially idle, no assumptions
+         * can be made about its state.
+         */
+        IDLE,
+        /**
+         * Netty thread is currently reading, once the read completes,
+         * if will flush the queue in the {@link #FLUSHING} state.
+         */
+        READING,
+        /**
+         * Netty thread is currently performing a flush on the queue.
+         * It will then transition to {@link #IDLE} state.
+         */
+        WRITING,
+    }
 
-    /**
-     * This is the default upper bound we place on the flush task running
-     * a single iteration. We relinquish control after about this amount
-     * of time.
-     */
-    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
+    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
 
     /**
-     * We re-check the time spent flushing every this many messages. We do this because
-     * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
-     * or similar to disable the feature.
+     * Default low write watermark. Channel will become writable when number of outstanding
+     * bytes dips below this value.
      */
-    private static final int WORKTIME_RECHECK_MSGS = 64;
+    private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
 
     /**
-     * We maintain a cache of this many previous queues for later reuse.
+     * Default write high watermark. Channel will become un-writable when number of
+     * outstanding bytes hits this value.
      */
-    private static final int QUEUE_CACHE_SIZE = 4;
+    private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
 
-    private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
-    private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
+    private final StackedOutboundQueue currentQueue;
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
+    private final int maxNonBarrierMessages;
     private final long maxBarrierNanos;
-    private final long maxWorkTime;
-    private final int queueSize;
     private final T handler;
 
+    // Accessed concurrently
+    private volatile PipelineState state = PipelineState.IDLE;
+
     // Updated from netty only
-    private long lastBarrierNanos = System.nanoTime();
-    private OutboundQueueImpl currentQueue;
+    private boolean alreadyReading;
     private boolean barrierTimerEnabled;
+    private long lastBarrierNanos = System.nanoTime();
     private int nonBarrierMessages;
-    private long lastXid = 0;
+    private boolean shuttingDown;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -70,6 +83,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             flush();
         }
     };
+
+    // Passed to executor to request a periodic barrier check
     private final Runnable barrierRunnable = new Runnable() {
         @Override
         public void run() {
@@ -78,18 +93,18 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     };
 
     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
-        final int queueSize, final long maxBarrierNanos) {
+        final int maxNonBarrierMessages, final long maxBarrierNanos) {
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
-        Preconditions.checkArgument(queueSize > 0);
-        this.queueSize = queueSize;
+        Preconditions.checkArgument(maxNonBarrierMessages > 0);
+        this.maxNonBarrierMessages = maxNonBarrierMessages;
         Preconditions.checkArgument(maxBarrierNanos > 0);
         this.maxBarrierNanos = maxBarrierNanos;
         this.address = address;
-        this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
 
-        LOG.debug("Queue manager instantiated with queue size {}", queueSize);
-        createQueue();
+        currentQueue = new StackedOutboundQueue(this);
+        LOG.debug("Queue manager instantiated with queue {}", currentQueue);
+        handler.onConnectionQueueChanged(currentQueue);
     }
 
     T getHandler() {
@@ -101,33 +116,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         handler.onConnectionQueueChanged(null);
     }
 
-    private void retireQueue(final OutboundQueueImpl queue) {
-        if (queueCache.offer(queue)) {
-            LOG.trace("Saving queue {} for later reuse", queue);
-        } else {
-            LOG.trace("Queue {} thrown away", queue);
-        }
-    }
-
-    private void createQueue() {
-        final long baseXid = lastXid;
-        lastXid += queueSize + 1;
-
-        final OutboundQueueImpl cached = queueCache.poll();
-        final OutboundQueueImpl queue;
-        if (cached != null) {
-            queue = cached.reuse(baseXid);
-            LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
-        } else {
-            queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
-        }
-
-        activeQueues.add(queue);
-        currentQueue = queue;
-        handler.onConnectionQueueChanged(queue);
-    }
-
     private void scheduleBarrierTimer(final long now) {
         long next = lastBarrierNanos + maxBarrierNanos;
         if (next < now) {
@@ -152,45 +140,6 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.trace("Barrier XID {} scheduled", xid);
     }
 
-    /**
-     * Flush an entry from the queue.
-     *
-     * @param now Time reference for 'now'. We take this as an argument, as
-     *            we need a timestamp to mark barrier messages we see swinging
-     *            by. That timestamp does not need to be completely accurate,
-     *            hence we use the flush start time. Alternative would be to
-     *            measure System.nanoTime() for each barrier -- needlessly
-     *            adding overhead.
-     *
-     * @return Entry which was flushed, null if no entry is ready.
-     */
-    OfHeader flushEntry(final long now) {
-        final OfHeader message = currentQueue.flushEntry();
-        if (currentQueue.isFlushed()) {
-            LOG.debug("Queue {} is fully flushed", currentQueue);
-            createQueue();
-        }
-
-        if (message == null) {
-            return null;
-        }
-
-        if (message instanceof BarrierInput) {
-            LOG.trace("Barrier message seen, resetting counters");
-            nonBarrierMessages = 0;
-            lastBarrierNanos = now;
-        } else {
-            nonBarrierMessages++;
-            if (nonBarrierMessages >= queueSize) {
-                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
-                scheduleBarrierMessage();
-            } else if (!barrierTimerEnabled) {
-                scheduleBarrierTimer(now);
-            }
-        }
-
-        return message;
-    }
 
     /**
      * Invoked whenever a message comes in from the switch. Runs matching
@@ -202,77 +151,25 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     boolean onMessage(final OfHeader message) {
         LOG.trace("Attempting to pair message {} to a request", message);
 
-        Iterator<OutboundQueueImpl> it = activeQueues.iterator();
-        while (it.hasNext()) {
-            final OutboundQueueImpl queue = it.next();
-            final OutboundQueueEntry entry = queue.pairRequest(message);
-
-            if (entry == null) {
-                continue;
-            }
-
-            LOG.trace("Queue {} accepted response {}", queue, message);
-
-            // This has been a barrier request, we need to flush all
-            // previous queues
-            if (entry.isBarrier() && activeQueues.size() > 1) {
-                LOG.trace("Queue {} indicated request was a barrier", queue);
-
-                it = activeQueues.iterator();
-                while (it.hasNext()) {
-                    final OutboundQueueImpl q = it.next();
-
-                    // We want to complete all queues before the current one, we will
-                    // complete the current queue below
-                    if (!queue.equals(q)) {
-                        LOG.trace("Queue {} is implied finished", q);
-                        q.completeAll();
-                        it.remove();
-                        retireQueue(q);
-                    } else {
-                        break;
-                    }
-                }
-            }
-
-            if (queue.isFinished()) {
-                LOG.trace("Queue {} is finished", queue);
-                it.remove();
-                retireQueue(queue);
-            }
-
-            return true;
-        }
-
-        LOG.debug("Failed to find completion for message {}", message);
-        return false;
+        return currentQueue.pairRequest(message);
     }
 
     private void scheduleFlush() {
-        if (parent.getChannel().isWritable()) {
-            if (flushScheduled.compareAndSet(false, true)) {
-                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
-                parent.getChannel().eventLoop().execute(flushRunnable);
-            } else {
-                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
-            }
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+            parent.getChannel().eventLoop().execute(flushRunnable);
         } else {
-            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+            LOG.trace("Flush task is already present on channel {}", parent.getChannel());
         }
     }
 
-    void ensureFlushing(final OutboundQueueImpl queue) {
-        Preconditions.checkState(currentQueue.equals(queue));
-        scheduleFlush();
-    }
-
     /**
      * Periodic barrier check.
      */
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
-        if (currentQueue == null) {
+        if (shuttingDown) {
             LOG.trace("Channel shut down, not processing barrier");
             return;
         }
@@ -290,59 +187,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
     }
 
-    /**
-     * Perform a single flush operation.
-     */
-    protected void flush() {
-        final long start = System.nanoTime();
-        final long deadline = start + maxWorkTime;
-
-        LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
-
-        long messages = 0;
-        for (;; ++messages) {
-            if (!parent.getChannel().isWritable()) {
-                LOG.trace("Channel is no longer writable");
-                break;
-            }
-
-            final OfHeader message = flushEntry(start);
-            if (message == null) {
-                LOG.trace("The queue is completely drained");
-                break;
-            }
-
-            final Object wrapper;
-            if (address == null) {
-                wrapper = new MessageListenerWrapper(message, null);
-            } else {
-                wrapper = new UdpMessageListenerWrapper(message, null, address);
-            }
-            parent.getChannel().write(wrapper);
-
-            /*
-             * Check every WORKTIME_RECHECK_MSGS for exceeded time.
-             *
-             * XXX: given we already measure our flushing throughput, we
-             *      should be able to perform dynamic adjustments here.
-             *      is that additional complexity needed, though?
-             */
-            if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
-                LOG.trace("Exceeded allotted work time {}us",
-                        TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
-                break;
-            }
-        }
-
-        if (messages > 0) {
-            LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
-            parent.getChannel().flush();
-        }
-
-        final long stop = System.nanoTime();
-        LOG.debug("Flushed {} messages in {}us to channel {}",
-                messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
-
+    private void rescheduleFlush() {
         /*
          * We are almost ready to terminate. This is a bit tricky, because
          * we do not want to have a race window where a message would be
@@ -359,55 +204,224 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush();
     }
 
+    private void writeAndFlush() {
+        state = PipelineState.WRITING;
+
+        final long start = System.nanoTime();
+
+        final int entries = currentQueue.writeEntries(parent.getChannel(), start);
+        if (entries > 0) {
+            LOG.trace("Flushing channel {}", parent.getChannel());
+            parent.getChannel().flush();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            final long stop = System.nanoTime();
+            LOG.debug("Flushed {} messages to channel {} in {}us", entries,
+                parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
+        }
+
+        state = PipelineState.IDLE;
+    }
+
+    /**
+     * Perform a single flush operation. We keep it here so we do not generate
+     * syntetic accessors for private fields. Otherwise it could be moved into
+     * {@link #flushRunnable}.
+     */
+    protected void flush() {
+        // If the channel is gone, just flush whatever is not completed
+        if (!shuttingDown) {
+            LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
+            writeAndFlush();
+            rescheduleFlush();
+        } else if (currentQueue.finishShutdown()) {
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
+        }
+    }
 
     /**
      * Schedule a queue flush if it is not empty and the channel is found
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (!currentQueue.isEmpty()) {
-            scheduleFlush();
+        if (currentQueue.needsFlush()) {
+            if (shuttingDown || parent.getChannel().isWritable()) {
+                scheduleFlush();
+            } else {
+                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+            }
         } else {
             LOG.trace("Queue is empty, no flush needed");
         }
     }
 
-    private void conditionalFlush(final ChannelHandlerContext ctx) {
-        Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
+    @Override
+    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
+        super.channelActive(ctx);
         conditionalFlush();
     }
 
     @Override
-    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
-        super.channelActive(ctx);
-        conditionalFlush(ctx);
+    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+        /*
+         * Tune channel write buffering. We increase the writability window
+         * to ensure we can flush an entire queue segment in one go. We definitely
+         * want to keep the difference above 64k, as that will ensure we use jam-packed
+         * TCP packets. UDP will fragment as appropriate.
+         */
+        ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
+        ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
+
+        super.handlerAdded(ctx);
     }
 
     @Override
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
-        conditionalFlush(ctx);
+
+        // The channel is writable again. There may be a flush task on the way, but let's
+        // steal its work, potentially decreasing latency. Since there is a window between
+        // now and when it will run, it may still pick up some more work to do.
+        LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+        writeAndFlush();
     }
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        long entries = 0;
-        LOG.debug("Channel shutdown, flushing queue...");
-        handler.onConnectionQueueChanged(null);
+        LOG.debug("Channel {} initiating shutdown...", ctx.channel());
+
+        shuttingDown = true;
+        final long entries = currentQueue.startShutdown(ctx.channel());
+        LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
 
-        final Throwable cause = new RejectedExecutionException("Channel disconnected");
-        for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(cause);
+        scheduleFlush();
+    }
+
+    @Override
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
+        // Netty does not provide a 'start reading' callback, so this is our first
+        // (and repeated) chance to detect reading. Since this callback can be invoked
+        // multiple times, we keep a boolean we check. That prevents a volatile write
+        // on repeated invocations. It will be cleared in channelReadComplete().
+        if (!alreadyReading) {
+            alreadyReading = true;
+            state = PipelineState.READING;
         }
-        activeQueues.clear();
+        super.channelRead(ctx, msg);
+    }
 
-        LOG.debug("Flushed {} queue entries", entries);
+    @Override
+    public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
+        super.channelReadComplete(ctx);
+
+        // Run flush regardless of writability. This is not strictly required, as
+        // there may be a scheduled flush. Instead of canceling it, which is expensive,
+        // we'll steal its work. Note that more work may accumulate in the time window
+        // between now and when the task will run, so it may not be a no-op after all.
+        //
+        // The reason for this is to will the output buffer before we go into selection
+        // phase. This will make sure the pipe is full (in which case our next wake up
+        // will be the queue becoming writable).
+        writeAndFlush();
     }
 
     @Override
     public String toString() {
         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
+
+    void ensureFlushing() {
+        // If the channel is not writable, there's no point in waking up,
+        // once we become writable, we will run a full flush
+        if (!parent.getChannel().isWritable()) {
+            return;
+        }
+
+        // We are currently reading something, just a quick sync to ensure we will in fact
+        // flush state.
+        final PipelineState localState = state;
+        LOG.debug("Synchronize on pipeline state {}", localState);
+        switch (localState) {
+        case READING:
+            // Netty thread is currently reading, it will flush the pipeline once it
+            // finishes reading. This is a no-op situation.
+            break;
+        case WRITING:
+        case IDLE:
+        default:
+            // We cannot rely on the change being flushed, schedule a request
+            scheduleFlush();
+        }
+    }
+
+    void onEchoRequest(final EchoRequestMessage message) {
+        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
+                .setVersion(message.getVersion()).setXid(message.getXid()).build();
+        parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
+    }
+
+    /**
+     * Write a message into the underlying channel.
+     *
+     * @param now Time reference for 'now'. We take this as an argument, as
+     *            we need a timestamp to mark barrier messages we see swinging
+     *            by. That timestamp does not need to be completely accurate,
+     *            hence we use the flush start time. Alternative would be to
+     *            measure System.nanoTime() for each barrier -- needlessly
+     *            adding overhead.
+     */
+    void writeMessage(final OfHeader message, final long now) {
+        final Object wrapper = makeMessageListenerWrapper(message);
+        parent.getChannel().write(wrapper);
+
+        if (message instanceof BarrierInput) {
+            LOG.trace("Barrier message seen, resetting counters");
+            nonBarrierMessages = 0;
+            lastBarrierNanos = now;
+        } else {
+            nonBarrierMessages++;
+            if (nonBarrierMessages >= maxNonBarrierMessages) {
+                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+                scheduleBarrierMessage();
+            } else if (!barrierTimerEnabled) {
+                scheduleBarrierTimer(now);
+            }
+        }
+    }
+
+    /**
+     * Wraps outgoing message and includes listener attached to this message
+     * which is send to OFEncoder for serialization. Correct wrapper is
+     * selected by communication pipeline.
+     *
+     * @return
+     */
+    private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+        Preconditions.checkArgument(msg != null);
+
+        if (address == null) {
+            return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
+        }
+        return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
+    }
+
+    /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
+    private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
+
+        private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
+
+        @Override
+        public void operationComplete(final Future<Void> future) throws Exception {
+            if (future.cause() != null) {
+                LOGGER.warn("Message encoding fail !", future.cause());
+            }
+        }
+    };
 }