Run flush immediately when channel becomes writable 76/21576/4
authorRobert Varga <rovarga@cisco.com>
Mon, 1 Jun 2015 20:52:14 +0000 (22:52 +0200)
committerRobert Varga <rovarga@cisco.com>
Tue, 2 Jun 2015 09:03:55 +0000 (11:03 +0200)
Instead of scheduling it on the executor, make sure we run the flush
task immediately.

Change-Id: Ic0cf9d031127d8e744a5cd79cfe440d52b3c3f0b
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java

index 5698eeec902382690f1cf45ef1f2b79a895e4535..c4feb26af88c3680770cf1e311fb7c79e4f67776 100644 (file)
@@ -107,6 +107,7 @@ final class OutboundQueueImpl implements OutboundQueue {
                 }
 
                 // We have traveled back, recover
+                LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
                 my = prev;
             }
         }
index 948424f91cfb063d96cf3c95ebf3cc65205d6a75..b3b7b8c384b49af1a09ec2e56a214a8ba6c37d8a 100644 (file)
@@ -334,7 +334,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         long messages = 0;
         for (;; ++messages) {
             if (!parent.getChannel().isWritable()) {
-                LOG.trace("Channel is no longer writable");
+                LOG.debug("Channel {} is no longer writable", parent.getChannel());
                 break;
             }
 
@@ -383,8 +383,12 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
-            scheduleFlush();
+        if (currentQueue.needsFlush()) {
+            if (shutdownOffset != null || parent.getChannel().isWritable()) {
+                scheduleFlush();
+            } else {
+                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+            }
         } else {
             LOG.trace("Queue is empty, no flush needed");
         }
@@ -404,7 +408,13 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     @Override
     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
         super.channelWritabilityChanged(ctx);
-        conditionalFlush(ctx);
+
+        if (flushScheduled.compareAndSet(false, true)) {
+            LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+            flush();
+        } else {
+            LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+        }
     }
 
     @Override