Tune write low/highwatermark
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index b7c2c8f9efd57511e03964082dd89855a7dd9775..66fa8a60ca799fe9772bfd5db0e65bceb0f43382 100644 (file)
@@ -8,19 +8,20 @@
 package org.opendaylight.openflowjava.protocol.impl.core.connection;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,33 +44,34 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private static final int WORKTIME_RECHECK_MSGS = 64;
 
     /**
-     * We maintain a cache of this many previous queues for later reuse.
+     * Default low write watermark. Channel will become writable when number of outstanding
+     * bytes dips below this value.
      */
-    private static final int QUEUE_CACHE_SIZE = 4;
+    private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
+
+    /**
+     * Default write high watermark. Channel will become un-writable when number of
+     * outstanding bytes hits this value.
+     */
+    private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
+
 
-    private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+    private final AtomicBoolean flushScheduled = new AtomicBoolean();
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
     private final long maxBarrierNanos;
     private final long maxWorkTime;
-    private final int queueSize;
     private final T handler;
 
-    /*
-     * Instead of using an AtomicBoolean object, we use these two. It saves us
-     * from allocating an extra object.
-     */
-    @SuppressWarnings("rawtypes")
-    private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
-    private volatile int flushScheduled = 0;
-
     // Updated from netty only
     private long lastBarrierNanos = System.nanoTime();
+    private OutboundQueueCacheSlice slice;
     private OutboundQueueImpl currentQueue;
+    private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
+    private Integer shutdownOffset;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -78,6 +80,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             flush();
         }
     };
+
+    // Passed to executor to request a periodic barrier check
     private final Runnable barrierRunnable = new Runnable() {
         @Override
         public void run() {
@@ -86,19 +90,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     };
 
     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
-        final int queueSize, final long maxBarrierNanos) {
+        final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
-        Preconditions.checkArgument(queueSize > 0);
-        this.queueSize = queueSize;
+        this.slice = Preconditions.checkNotNull(slice);
         Preconditions.checkArgument(maxBarrierNanos > 0);
         this.maxBarrierNanos = maxBarrierNanos;
         this.address = address;
         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
 
-        LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+        LOG.debug("Queue manager instantiated with queue slice {}", slice);
         createQueue();
-        scheduleBarrierTimer(lastBarrierNanos);
     }
 
     T getHandler() {
@@ -108,30 +110,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
-    }
-
-    private void retireQueue(final OutboundQueueImpl queue) {
-        if (queueCache.offer(queue)) {
-            LOG.debug("Saving queue {} for later reuse", queue);
-        } else {
-            LOG.debug("Queue {} thrown away", queue);
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
         }
     }
 
     private void createQueue() {
         final long baseXid = lastXid;
-        lastXid += queueSize + 1;
-
-        final OutboundQueueImpl cached = queueCache.poll();
-        final OutboundQueueImpl queue;
-        if (cached != null) {
-            queue = cached.reuse(baseXid);
-            LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
-        } else {
-            queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
-        }
+        lastXid += slice.getQueueSize() + 1;
 
+        final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
         activeQueues.add(queue);
         currentQueue = queue;
         handler.onConnectionQueueChanged(queue);
@@ -140,28 +129,25 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private void scheduleBarrierTimer(final long now) {
         long next = lastBarrierNanos + maxBarrierNanos;
         if (next < now) {
-            LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+            LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
             next = now + maxBarrierNanos;
         }
 
         final long delay = next - now;
-        LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+        LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+        barrierTimerEnabled = true;
     }
 
     private void scheduleBarrierMessage() {
-        final Long xid = currentQueue.reserveEntry(true);
-        Verify.verifyNotNull(xid);
+        final Long xid = currentQueue.reserveBarrierIfNeeded();
+        if (xid == null) {
+            LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
+            return;
+        }
 
         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
-        LOG.debug("Barrier XID {} scheduled", xid);
-
-        // We can see into the future when compared to flushEntry(), as that
-        // codepath may be lagging behind on messages. Resetting the counter
-        // here ensures that flushEntry() will not attempt to issue a flush
-        // request. Note that we do not reset current time, as that should
-        // reflect when we sent the message for real.
-        nonBarrierMessages = 0;
+        LOG.trace("Barrier XID {} scheduled", xid);
     }
 
     /**
@@ -188,14 +174,16 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
 
         if (message instanceof BarrierInput) {
-            LOG.debug("Barrier message seen, resetting counters");
+            LOG.trace("Barrier message seen, resetting counters");
             nonBarrierMessages = 0;
             lastBarrierNanos = now;
         } else {
             nonBarrierMessages++;
-            if (nonBarrierMessages >= queueSize) {
-                LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+            if (nonBarrierMessages >= slice.getQueueSize()) {
+                LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
+            } else if (!barrierTimerEnabled) {
+                scheduleBarrierTimer(now);
             }
         }
 
@@ -210,7 +198,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * @return True if the message matched a previous request, false otherwise.
      */
     boolean onMessage(final OfHeader message) {
-        LOG.debug("Attempting to pair message {} to a request", message);
+        LOG.trace("Attempting to pair message {} to a request", message);
 
         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
         while (it.hasNext()) {
@@ -221,12 +209,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                 continue;
             }
 
-            LOG.debug("Queue {} accepted response {}", queue, message);
+            LOG.trace("Queue {} accepted response {}", queue, message);
 
             // This has been a barrier request, we need to flush all
             // previous queues
             if (entry.isBarrier() && activeQueues.size() > 1) {
-                LOG.debug("Queue {} indicated request was a barrier", queue);
+                LOG.trace("Queue {} indicated request was a barrier", queue);
 
                 it = activeQueues.iterator();
                 while (it.hasNext()) {
@@ -235,10 +223,10 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                     // We want to complete all queues before the current one, we will
                     // complete the current queue below
                     if (!queue.equals(q)) {
-                        LOG.debug("Queue {} is implied finished", q);
+                        LOG.trace("Queue {} is implied finished", q);
                         q.completeAll();
                         it.remove();
-                        retireQueue(q);
+                        slice.putQueue(q);
                     } else {
                         break;
                     }
@@ -246,9 +234,9 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             }
 
             if (queue.isFinished()) {
-                LOG.debug("Queue {} is finished", queue);
+                LOG.trace("Queue {} is finished", queue);
                 it.remove();
-                retireQueue(queue);
+                slice.putQueue(queue);
             }
 
             return true;
@@ -259,15 +247,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     }
 
     private void scheduleFlush() {
-        if (parent.getChannel().isWritable()) {
-            if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
-                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
-                parent.getChannel().eventLoop().execute(flushRunnable);
-            } else {
-                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
-            }
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+            parent.getChannel().eventLoop().execute(flushRunnable);
         } else {
-            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+            LOG.trace("Flush task is already present on channel {}", parent.getChannel());
         }
     }
 
@@ -281,8 +265,9 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
-        if (currentQueue == null) {
-            LOG.debug("Channel shut down, not processing barrier");
+        barrierTimerEnabled = false;
+        if (shutdownOffset != null) {
+            LOG.trace("Channel shut down, not processing barrier");
             return;
         }
 
@@ -292,19 +277,70 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
             // FIXME: we should be tracking requests/responses instead of this
             if (nonBarrierMessages == 0) {
-                LOG.debug("No messages written since last barrier, not issuing one");
+                LOG.trace("No messages written since last barrier, not issuing one");
             } else {
                 scheduleBarrierMessage();
             }
         }
+    }
+
+    private void rescheduleFlush() {
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!flushScheduled.compareAndSet(true, false)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+        }
 
-        scheduleBarrierTimer(now);
+        conditionalFlush();
+    }
+
+    private void shutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+        while (it.hasNext()) {
+            final OutboundQueueImpl queue = it.next();
+
+            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (queue.isFinished()) {
+                LOG.trace("Cleared queue {}", queue);
+                it.remove();
+            }
+        }
+
+        LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+        Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+        if (currentQueue.isShutdown(shutdownOffset)) {
+            currentQueue = null;
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
+        }
     }
 
     /**
-     * Perform a single flush operation.
+     * Perform a single flush operation. We keep it here so we do not generate
+     * syntetic accessors for private fields. Otherwise it could be moved into
+     * {@link #flushRunnable}.
      */
     protected void flush() {
+        // If the channel is gone, just flush whatever is not completed
+        if (shutdownOffset != null) {
+            shutdownFlush();
+            return;
+        }
+
         final long start = System.nanoTime();
         final long deadline = start + maxWorkTime;
 
@@ -313,7 +349,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         long messages = 0;
         for (;; ++messages) {
             if (!parent.getChannel().isWritable()) {
-                LOG.trace("Channel is no longer writable");
+                LOG.debug("Channel {} is no longer writable", parent.getChannel());
                 break;
             }
 
@@ -354,30 +390,20 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Flushed {} messages in {}us to channel {}",
                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
 
-        /*
-         * We are almost ready to terminate. This is a bit tricky, because
-         * we do not want to have a race window where a message would be
-         * stuck on the queue without a flush being scheduled.
-         *
-         * So we mark ourselves as not running and then re-check if a
-         * flush out is needed. That will re-synchronized with other threads
-         * such that only one flush is scheduled at any given time.
-         */
-        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
-            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
-        }
-
-        conditionalFlush();
+        rescheduleFlush();
     }
 
-
     /**
      * Schedule a queue flush if it is not empty and the channel is found
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (!currentQueue.isEmpty()) {
-            scheduleFlush();
+        if (currentQueue.needsFlush()) {
+            if (shutdownOffset != null || parent.getChannel().isWritable()) {
+                scheduleFlush();
+            } else {
+                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+            }
         } else {
             LOG.trace("Queue is empty, no flush needed");
         }
@@ -394,31 +420,63 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush(ctx);
     }
 
+    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+        /*
+         * Tune channel write buffering. We increase the writability window
+         * to ensure we can flush an entire queue segment in one go. We definitely
+         * want to keep the difference above 64k, as that will ensure we use jam-packed
+         * TCP packets. UDP will fragment as appropriate.
+         */
+        ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
+        ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
+
+        super.handlerAdded(ctx);
+    }
+
     @Override
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
-        conditionalFlush(ctx);
+
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+            flush();
+        } else {
+            LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+        }
     }
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        long entries = 0;
-        LOG.debug("Channel shutdown, flushing queue...");
-        handler.onConnectionQueueChanged(null);
+        LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
 
-        final Throwable cause = new RejectedExecutionException("Channel disconnected");
-        for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(cause);
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         *
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        shutdownOffset = currentQueue.startShutdown();
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
         }
-        activeQueues.clear();
 
-        LOG.debug("Flushed {} queue entries", entries);
+        LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+        scheduleFlush();
     }
 
     @Override
     public String toString() {
-        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+        return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
+    }
+
+    void onEchoRequest(final EchoRequestMessage message) {
+        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+        parent.getChannel().writeAndFlush(reply);
     }
 }