BUG-3219: Fix flush task scheduling 60/20660/4
authorRobert Varga <rovarga@cisco.com>
Mon, 18 May 2015 16:11:04 +0000 (18:11 +0200)
committerRobert Varga <rovarga@cisco.com>
Mon, 18 May 2015 18:04:44 +0000 (20:04 +0200)
The flush task should be conditionally scheduled if we have any
outstanding requests. We also cannot directly report the channel
shutdown until we have ensured that all requests have been completed by
the user.

Change-Id: Ic8d266d6167eed15f3a51bf8476bbd1db3071dd4
Signed-off-by: Robert Varga <rovarga@cisco.com>
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java
openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java

index 6713d5ceab000eb5df956833381dfd4af640674e..4f708004eb08420629ad09ee8dc24bcc0c19d74c 100644 (file)
@@ -135,12 +135,30 @@ final class OutboundQueueImpl implements OutboundQueue {
         }
     }
 
+    int startShutdown() {
+        // Increment the offset by the queue size, hence preventing any normal
+        // allocations. We should not be seeing a barrier reservation after this
+        // and if there is one issued, we can disregard it.
+        final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
+
+        // If this offset is larger than reserve, trim it. That is not an accurate
+        // view of which slot was actually "reserved", but it indicates at which
+        // entry we can declare the queue flushed (e.g. at the emergency slot).
+        return offset > reserve ? reserve : offset;
+    }
+
+    boolean isShutdown(final int offset) {
+        // This queue is shutdown if the flushOffset (e.g. the next entry to
+        // be flushed) points to the offset 'reserved' in startShutdown()
+        return flushOffset >= offset;
+    }
+
     /**
      * An empty queue is a queue which has no further unflushed entries.
      *
      * @return True if this queue does not have unprocessed entries.
      */
-    boolean isEmpty() {
+    private boolean isEmpty() {
         int ro = reserveOffset;
         if (ro >= reserve) {
             if (queue[reserve].isCommitted()) {
@@ -179,6 +197,23 @@ final class OutboundQueueImpl implements OutboundQueue {
         return flushOffset >= queue.length || !queue[reserve].isCommitted();
     }
 
+    boolean needsFlush() {
+        if (flushOffset < reserve) {
+            return queue[flushOffset].isCommitted();
+        }
+
+        if (isFlushed()) {
+            LOG.trace("Queue {} is flushed, schedule a replace", this);
+            return true;
+        }
+        if (isFinished()) {
+            LOG.trace("Queue {} is finished, schedule a cleanup", this);
+            return true;
+        }
+
+        return false;
+    }
+
     OfHeader flushEntry() {
         for (;;) {
             // No message ready
index 5d36859d6299a92dd83e7646b8ec4279f25c9bb7..a4d3f9746928dba8e7953db7923754a576a59957 100644 (file)
@@ -62,6 +62,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     private boolean barrierTimerEnabled;
     private int nonBarrierMessages;
     private long lastXid = 0;
+    private Integer shutdownOffset;
 
     // Passed to executor to request triggering of flush
     private final Runnable flushRunnable = new Runnable() {
@@ -270,7 +271,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     protected void barrier() {
         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
         barrierTimerEnabled = false;
-        if (currentQueue == null) {
+        if (shutdownOffset != null) {
             LOG.trace("Channel shut down, not processing barrier");
             return;
         }
@@ -288,25 +289,58 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         }
     }
 
+    private void rescheduleFlush() {
+        /*
+         * We are almost ready to terminate. This is a bit tricky, because
+         * we do not want to have a race window where a message would be
+         * stuck on the queue without a flush being scheduled.
+         *
+         * So we mark ourselves as not running and then re-check if a
+         * flush out is needed. That will re-synchronized with other threads
+         * such that only one flush is scheduled at any given time.
+         */
+        if (!flushScheduled.compareAndSet(true, false)) {
+            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+        }
+
+        conditionalFlush();
+    }
+
+    private void shutdownFlush() {
+        long entries = 0;
+
+        // Fail all queues
+        final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+        while (it.hasNext()) {
+            final OutboundQueueImpl queue = it.next();
+
+            entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+            if (queue.isFinished()) {
+                LOG.trace("Cleared queue {}", queue);
+                it.remove();
+            }
+        }
+
+        LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+        Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+        if (currentQueue.isShutdown(shutdownOffset)) {
+            currentQueue = null;
+            handler.onConnectionQueueChanged(null);
+            LOG.debug("Channel {} shutdown complete", parent.getChannel());
+        } else {
+            LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+            rescheduleFlush();
+        }
+    }
+
     /**
      * Perform a single flush operation.
      */
     protected void flush() {
         // If the channel is gone, just flush whatever is not completed
-        if (currentQueue == null) {
-            long entries = 0;
-
-            final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
-            while (it.hasNext()) {
-                final OutboundQueueImpl queue = it.next();
-                entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
-                if (queue.isFinished()) {
-                    LOG.trace("Cleared queue {}", queue);
-                    it.remove();
-                }
-            }
-
-            LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+        if (shutdownOffset != null) {
+            shutdownFlush();
             return;
         }
 
@@ -359,20 +393,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
         LOG.debug("Flushed {} messages in {}us to channel {}",
                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
 
-        /*
-         * We are almost ready to terminate. This is a bit tricky, because
-         * we do not want to have a race window where a message would be
-         * stuck on the queue without a flush being scheduled.
-         *
-         * So we mark ourselves as not running and then re-check if a
-         * flush out is needed. That will re-synchronized with other threads
-         * such that only one flush is scheduled at any given time.
-         */
-        if (!flushScheduled.compareAndSet(true, false)) {
-            LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
-        }
-
-        conditionalFlush();
+        rescheduleFlush();
     }
 
     /**
@@ -380,7 +401,7 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
      * to be writable. May only be called from Netty context.
      */
     private void conditionalFlush() {
-        if (!currentQueue.isEmpty()) {
+        if (currentQueue.needsFlush()) {
             scheduleFlush();
         } else {
             LOG.trace("Queue is empty, no flush needed");
@@ -408,11 +429,20 @@ final class OutboundQueueManager<T extends OutboundQueueHandler> extends Channel
     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
-        LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel());
-        handler.onConnectionQueueChanged(null);
-        currentQueue = null;
-        queueCache.clear();
+        LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
 
+        /*
+         * We are dealing with a multi-threaded shutdown, as the user may still
+         * be reserving entries in the queue. We are executing in a netty thread,
+         * so neither flush nor barrier can be running, which is good news.
+         *
+         * We will eat up all the slots in the queue here and mark the offset first
+         * reserved offset and free up all the cached queues. We then schedule
+         * the flush task, which will deal with the rest of the shutdown process.
+         */
+        shutdownOffset = currentQueue.startShutdown();
+        queueCache.clear();
+        LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
         scheduleFlush();
     }