From 752e794406bc31ea5412f780b01d5db5c5b80e33 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 18 May 2015 18:11:04 +0200 Subject: [PATCH] BUG-3219: Fix flush task scheduling The flush task should be conditionally scheduled if we have any outstanding requests. We also cannot directly report the channel shutdown until we have ensured that all requests have been completed by the user. Change-Id: Ic8d266d6167eed15f3a51bf8476bbd1db3071dd4 Signed-off-by: Robert Varga --- .../core/connection/OutboundQueueImpl.java | 37 ++++++- .../core/connection/OutboundQueueManager.java | 98 ++++++++++++------- 2 files changed, 100 insertions(+), 35 deletions(-) diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java index 6713d5ce..4f708004 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueImpl.java @@ -135,12 +135,30 @@ final class OutboundQueueImpl implements OutboundQueue { } } + int startShutdown() { + // Increment the offset by the queue size, hence preventing any normal + // allocations. We should not be seeing a barrier reservation after this + // and if there is one issued, we can disregard it. + final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length); + + // If this offset is larger than reserve, trim it. That is not an accurate + // view of which slot was actually "reserved", but it indicates at which + // entry we can declare the queue flushed (e.g. at the emergency slot). + return offset > reserve ? reserve : offset; + } + + boolean isShutdown(final int offset) { + // This queue is shutdown if the flushOffset (e.g. the next entry to + // be flushed) points to the offset 'reserved' in startShutdown() + return flushOffset >= offset; + } + /** * An empty queue is a queue which has no further unflushed entries. * * @return True if this queue does not have unprocessed entries. */ - boolean isEmpty() { + private boolean isEmpty() { int ro = reserveOffset; if (ro >= reserve) { if (queue[reserve].isCommitted()) { @@ -179,6 +197,23 @@ final class OutboundQueueImpl implements OutboundQueue { return flushOffset >= queue.length || !queue[reserve].isCommitted(); } + boolean needsFlush() { + if (flushOffset < reserve) { + return queue[flushOffset].isCommitted(); + } + + if (isFlushed()) { + LOG.trace("Queue {} is flushed, schedule a replace", this); + return true; + } + if (isFinished()) { + LOG.trace("Queue {} is finished, schedule a cleanup", this); + return true; + } + + return false; + } + OfHeader flushEntry() { for (;;) { // No message ready diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index 5d36859d..a4d3f974 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -62,6 +62,7 @@ final class OutboundQueueManager extends Channel private boolean barrierTimerEnabled; private int nonBarrierMessages; private long lastXid = 0; + private Integer shutdownOffset; // Passed to executor to request triggering of flush private final Runnable flushRunnable = new Runnable() { @@ -270,7 +271,7 @@ final class OutboundQueueManager extends Channel protected void barrier() { LOG.debug("Channel {} barrier timer expired", parent.getChannel()); barrierTimerEnabled = false; - if (currentQueue == null) { + if (shutdownOffset != null) { LOG.trace("Channel shut down, not processing barrier"); return; } @@ -288,25 +289,58 @@ final class OutboundQueueManager extends Channel } } + private void rescheduleFlush() { + /* + * We are almost ready to terminate. This is a bit tricky, because + * we do not want to have a race window where a message would be + * stuck on the queue without a flush being scheduled. + * + * So we mark ourselves as not running and then re-check if a + * flush out is needed. That will re-synchronized with other threads + * such that only one flush is scheduled at any given time. + */ + if (!flushScheduled.compareAndSet(true, false)) { + LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); + } + + conditionalFlush(); + } + + private void shutdownFlush() { + long entries = 0; + + // Fail all queues + final Iterator it = activeQueues.iterator(); + while (it.hasNext()) { + final OutboundQueueImpl queue = it.next(); + + entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); + if (queue.isFinished()) { + LOG.trace("Cleared queue {}", queue); + it.remove(); + } + } + + LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel()); + + Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet"); + if (currentQueue.isShutdown(shutdownOffset)) { + currentQueue = null; + handler.onConnectionQueueChanged(null); + LOG.debug("Channel {} shutdown complete", parent.getChannel()); + } else { + LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel()); + rescheduleFlush(); + } + } + /** * Perform a single flush operation. */ protected void flush() { // If the channel is gone, just flush whatever is not completed - if (currentQueue == null) { - long entries = 0; - - final Iterator it = activeQueues.iterator(); - while (it.hasNext()) { - final OutboundQueueImpl queue = it.next(); - entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); - if (queue.isFinished()) { - LOG.trace("Cleared queue {}", queue); - it.remove(); - } - } - - LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel()); + if (shutdownOffset != null) { + shutdownFlush(); return; } @@ -359,20 +393,7 @@ final class OutboundQueueManager extends Channel LOG.debug("Flushed {} messages in {}us to channel {}", messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel()); - /* - * We are almost ready to terminate. This is a bit tricky, because - * we do not want to have a race window where a message would be - * stuck on the queue without a flush being scheduled. - * - * So we mark ourselves as not running and then re-check if a - * flush out is needed. That will re-synchronized with other threads - * such that only one flush is scheduled at any given time. - */ - if (!flushScheduled.compareAndSet(true, false)) { - LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); - } - - conditionalFlush(); + rescheduleFlush(); } /** @@ -380,7 +401,7 @@ final class OutboundQueueManager extends Channel * to be writable. May only be called from Netty context. */ private void conditionalFlush() { - if (!currentQueue.isEmpty()) { + if (currentQueue.needsFlush()) { scheduleFlush(); } else { LOG.trace("Queue is empty, no flush needed"); @@ -408,11 +429,20 @@ final class OutboundQueueManager extends Channel public void channelInactive(final ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); - LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel()); - handler.onConnectionQueueChanged(null); - currentQueue = null; - queueCache.clear(); + LOG.debug("Channel {} initiating shutdown...", parent.getChannel()); + /* + * We are dealing with a multi-threaded shutdown, as the user may still + * be reserving entries in the queue. We are executing in a netty thread, + * so neither flush nor barrier can be running, which is good news. + * + * We will eat up all the slots in the queue here and mark the offset first + * reserved offset and free up all the cached queues. We then schedule + * the flush task, which will deal with the rest of the shutdown process. + */ + shutdownOffset = currentQueue.startShutdown(); + queueCache.clear(); + LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset); scheduleFlush(); } -- 2.36.6