Run flush immediately when channel becomes writable
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
index 609e23c16acf52616d113eda51dbc6423f185970..b3b7b8c384b49af1a09ec2e56a214a8ba6c37d8a 100644 (file)
@@ -11,7 +11,6 @@ import com.google.common.base.Preconditions;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Queue;
@@ -44,23 +43,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      */
     private static final int WORKTIME_RECHECK_MSGS = 64;
 
-    /**
-     * We maintain a cache of this many previous queues for later reuse.
-     */
-    private static final int QUEUE_CACHE_SIZE = 4;
-
-    private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
     private final AtomicBoolean flushScheduled = new AtomicBoolean();
     private final ConnectionAdapterImpl parent;
     private final InetSocketAddress address;
     private final long maxBarrierNanos;
     private final long maxWorkTime;
-    private final int queueSize;
     private final T handler;
 
     // Updated from netty only
     private long lastBarrierNanos = System.nanoTime();
+    private OutboundQueueCacheSlice slice;
     private OutboundQueueImpl currentQueue;
     private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
@@ -84,17 +77,16 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     };
 
     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
-        final int queueSize, final long maxBarrierNanos) {
+        final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
         this.parent = Preconditions.checkNotNull(parent);
         this.handler = Preconditions.checkNotNull(handler);
-        Preconditions.checkArgument(queueSize > 0);
-        this.queueSize = queueSize;
+        this.slice = Preconditions.checkNotNull(slice);
         Preconditions.checkArgument(maxBarrierNanos > 0);
         this.maxBarrierNanos = maxBarrierNanos;
         this.address = address;
         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
 
-        LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+        LOG.debug("Queue manager instantiated with queue slice {}", slice);
         createQueue();
     }
 
@@ -105,30 +97,17 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void close() {
         handler.onConnectionQueueChanged(null);
-    }
-
-    private void retireQueue(final OutboundQueueImpl queue) {
-        if (queueCache.offer(queue)) {
-            LOG.trace("Saving queue {} for later reuse", queue);
-        } else {
-            LOG.trace("Queue {} thrown away", queue);
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
         }
     }
 
     private void createQueue() {
         final long baseXid = lastXid;
-        lastXid += queueSize + 1;
-
-        final OutboundQueueImpl cached = queueCache.poll();
-        final OutboundQueueImpl queue;
-        if (cached != null) {
-            queue = cached.reuse(baseXid);
-            LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
-        } else {
-            queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
-            LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
-        }
+        lastXid += slice.getQueueSize() + 1;
 
+        final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
         activeQueues.add(queue);
         currentQueue = queue;
         handler.onConnectionQueueChanged(queue);
@@ -187,7 +166,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             lastBarrierNanos = now;
         } else {
             nonBarrierMessages++;
-            if (nonBarrierMessages >= queueSize) {
+            if (nonBarrierMessages >= slice.getQueueSize()) {
                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
                 scheduleBarrierMessage();
             } else if (!barrierTimerEnabled) {
@@ -234,7 +213,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
                         LOG.trace("Queue {} is implied finished", q);
                         q.completeAll();
                         it.remove();
-                        retireQueue(q);
+                        slice.putQueue(q);
                     } else {
                         break;
                     }
@@ -244,7 +223,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
             if (queue.isFinished()) {
                 LOG.trace("Queue {} is finished", queue);
                 it.remove();
-                retireQueue(queue);
+                slice.putQueue(queue);
             }
 
             return true;
@@ -355,7 +334,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         long messages = 0;
         for (;; ++messages) {
             if (!parent.getChannel().isWritable()) {
-                LOG.trace("Channel is no longer writable");
+                LOG.debug("Channel {} is no longer writable", parent.getChannel());
                 break;
             }
 
@@ -404,8 +383,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
-            scheduleFlush();
+        if (currentQueue.needsFlush()) {
+            if (shutdownOffset != null || parent.getChannel().isWritable()) {
+                scheduleFlush();
+            } else {
+                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+            }
         } else {
             LOG.trace("Queue is empty, no flush needed");
         }
@@ -425,7 +408,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
-        conditionalFlush(ctx);
+
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+            flush();
+        } else {
+            LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+        }
     }
 
     @Override
@@ -444,7 +433,11 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
          * the flush task, which will deal with the rest of the shutdown process.
          */
         shutdownOffset = currentQueue.startShutdown();
-        queueCache.clear();
+        if (slice != null) {
+            slice.decRef();
+            slice = null;
+        }
+
         LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
         scheduleFlush();
     }