Tune write low/highwatermark
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index 5c9a3515cea1de1a0d28a7d29c2d3c646dd9996c..66fa8a60ca799fe9772bfd5db0e65bceb0f43382 100644 (file)
@@ -11,7 +11,6 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -20,6 +19,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,26 +44,34 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private static final int WORKTIME_RECHECK_MSGS = 64;
 
     /**
-     * We maintain a cache of this many previous queues for later reuse.
+     * Default low write watermark. Channel will become writable when number of outstanding
+     * bytes dips below this value.
      */
-    private static final int QUEUE_CACHE_SIZE = 4;
+    private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
+
+    /**
+     * Default write high watermark. Channel will become un-writable when number of
+     * outstanding bytes hits this value.
+     */
+    private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
+
 
-    private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
     private final long maxBarrierNanos;
     private final long maxWorkTime;
-    private final int queueSize;
     private final T handler;
 
     // Updated from netty only
     private long lastBarrierNanos = System.nanoTime();
+    private OutboundQueueCacheSlice slice;
     private OutboundQueueImpl currentQueue;
     private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
+    private Integer shutdownOffset;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -70,6 +80,8 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             flush();
         }
     };
+
+    // Passed to executor to request a periodic barrier check
     private final Runnable barrierRunnable = new Runnable() {
         @Override
         public void run() {
@@ -78,17 +90,16 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     };
 
     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
-        final int queueSize, final long maxBarrierNanos) {
+        final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
-        Preconditions.checkArgument(queueSize > 0);
-        this.queueSize = queueSize;
+        this.slice = Preconditions.checkNotNull(slice);
         Preconditions.checkArgument(maxBarrierNanos > 0);
         this.maxBarrierNanos = maxBarrierNanos;
         this.address = address;
         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
 
-        LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+        LOG.debug("Queue manager instantiated with queue slice {}", slice);
         createQueue();
     }
 
@@ -99,30 +110,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
-    }
-
-    private void retireQueue(final OutboundQueueImpl queue) {
-        if (queueCache.offer(queue)) {
-            LOG.trace("Saving queue {} for later reuse", queue);
-        } else {
-            LOG.trace("Queue {} thrown away", queue);
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
         }
     }
 
     private void createQueue() {
         final long baseXid = lastXid;
-        lastXid += queueSize + 1;
-
-        final OutboundQueueImpl cached = queueCache.poll();
-        final OutboundQueueImpl queue;
-        if (cached != null) {
-            queue = cached.reuse(baseXid);
-            LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
-        } else {
-            queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
-        }
+        lastXid += slice.getQueueSize() + 1;
 
+        final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
         activeQueues.add(queue);
         currentQueue = queue;
         handler.onConnectionQueueChanged(queue);
@@ -181,7 +179,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             lastBarrierNanos = now;
         } else {
             nonBarrierMessages++;
-            if (nonBarrierMessages >= queueSize) {
+            if (nonBarrierMessages >= slice.getQueueSize()) {
                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
             } else if (!barrierTimerEnabled) {
@@ -228,7 +226,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                         LOG.trace("Queue {} is implied finished", q);
                         q.completeAll();
                         it.remove();
-                        retireQueue(q);
+                        slice.putQueue(q);
                     } else {
                         break;
                     }
@@ -238,7 +236,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             if (queue.isFinished()) {
                 LOG.trace("Queue {} is finished", queue);
                 it.remove();
-                retireQueue(queue);
+                slice.putQueue(queue);
             }
 
             return true;
@@ -249,15 +247,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     }
 
     private void scheduleFlush() {
-        if (parent.getChannel().isWritable()) {
-            if (flushScheduled.compareAndSet(false, true)) {
-                LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
-                parent.getChannel().eventLoop().execute(flushRunnable);
-            } else {
-                LOG.trace("Flush task is already present on channel {}", parent.getChannel());
-            }
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+            parent.getChannel().eventLoop().execute(flushRunnable);
         } else {
-            LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+            LOG.trace("Flush task is already present on channel {}", parent.getChannel());
         }
     }
 
@@ -272,7 +266,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
-        if (currentQueue == null) {
+        if (shutdownOffset != null) {
             LOG.trace("Channel shut down, not processing barrier");
             return;
         }
@@ -290,10 +284,63 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
     }
 
+    private void rescheduleFlush() {
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!flushScheduled.compareAndSet(true, false)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+        }
+
+        conditionalFlush();
+    }
+
+    private void shutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+        while (it.hasNext()) {
+            final OutboundQueueImpl queue = it.next();
+
+            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (queue.isFinished()) {
+                LOG.trace("Cleared queue {}", queue);
+                it.remove();
+            }
+        }
+
+        LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+        Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+        if (currentQueue.isShutdown(shutdownOffset)) {
+            currentQueue = null;
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
+        }
+    }
+
     /**
-     * Perform a single flush operation.
+     * Perform a single flush operation. We keep it here so we do not generate
+     * syntetic accessors for private fields. Otherwise it could be moved into
+     * {@link #flushRunnable}.
      */
     protected void flush() {
+        // If the channel is gone, just flush whatever is not completed
+        if (shutdownOffset != null) {
+            shutdownFlush();
+            return;
+        }
+
         final long start = System.nanoTime();
         final long deadline = start + maxWorkTime;
 
@@ -302,7 +349,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         long messages = 0;
         for (;; ++messages) {
             if (!parent.getChannel().isWritable()) {
-                LOG.trace("Channel is no longer writable");
+                LOG.debug("Channel {} is no longer writable", parent.getChannel());
                 break;
             }
 
@@ -343,30 +390,20 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Flushed {} messages in {}us to channel {}",
                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
 
-        /*
-         * We are almost ready to terminate. This is a bit tricky, because
-         * we do not want to have a race window where a message would be
-         * stuck on the queue without a flush being scheduled.
-         *
-         * So we mark ourselves as not running and then re-check if a
-         * flush out is needed. That will re-synchronized with other threads
-         * such that only one flush is scheduled at any given time.
-         */
-        if (!flushScheduled.compareAndSet(true, false)) {
-            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
-        }
-
-        conditionalFlush();
+        rescheduleFlush();
     }
 
-
     /**
      * Schedule a queue flush if it is not empty and the channel is found
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (!currentQueue.isEmpty()) {
-            scheduleFlush();
+        if (currentQueue.needsFlush()) {
+            if (shutdownOffset != null || parent.getChannel().isWritable()) {
+                scheduleFlush();
+            } else {
+                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+            }
         } else {
             LOG.trace("Queue is empty, no flush needed");
         }
@@ -383,30 +420,63 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         conditionalFlush(ctx);
     }
 
+    public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+        /*
+         * Tune channel write buffering. We increase the writability window
+         * to ensure we can flush an entire queue segment in one go. We definitely
+         * want to keep the difference above 64k, as that will ensure we use jam-packed
+         * TCP packets. UDP will fragment as appropriate.
+         */
+        ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
+        ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
+
+        super.handlerAdded(ctx);
+    }
+
     @Override
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
-        conditionalFlush(ctx);
+
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+            flush();
+        } else {
+            LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+        }
     }
 
     @Override
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        long entries = 0;
-        LOG.debug("Channel shutdown, flushing queue...");
-        handler.onConnectionQueueChanged(null);
+        LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
 
-        for (OutboundQueueImpl queue : activeQueues) {
-            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         *
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        shutdownOffset = currentQueue.startShutdown();
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
         }
-        activeQueues.clear();
 
-        LOG.debug("Flushed {} queue entries", entries);
+        LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+        scheduleFlush();
     }
 
     @Override
     public String toString() {
         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
     }
+
+    void onEchoRequest(final EchoRequestMessage message) {
+        final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+        parent.getChannel().writeAndFlush(reply);
+    }
 }