}
}
+ int startShutdown() {
+ // Increment the offset by the queue size, hence preventing any normal
+ // allocations. We should not be seeing a barrier reservation after this
+ // and if there is one issued, we can disregard it.
+ final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
+
+ // If this offset is larger than reserve, trim it. That is not an accurate
+ // view of which slot was actually "reserved", but it indicates at which
+ // entry we can declare the queue flushed (e.g. at the emergency slot).
+ return offset > reserve ? reserve : offset;
+ }
+
+ boolean isShutdown(final int offset) {
+ // This queue is shutdown if the flushOffset (e.g. the next entry to
+ // be flushed) points to the offset 'reserved' in startShutdown()
+ return flushOffset >= offset;
+ }
+
/**
* An empty queue is a queue which has no further unflushed entries.
*
* @return True if this queue does not have unprocessed entries.
*/
- boolean isEmpty() {
+ private boolean isEmpty() {
int ro = reserveOffset;
if (ro >= reserve) {
if (queue[reserve].isCommitted()) {
return flushOffset >= queue.length || !queue[reserve].isCommitted();
}
+ boolean needsFlush() {
+ if (flushOffset < reserve) {
+ return queue[flushOffset].isCommitted();
+ }
+
+ if (isFlushed()) {
+ LOG.trace("Queue {} is flushed, schedule a replace", this);
+ return true;
+ }
+ if (isFinished()) {
+ LOG.trace("Queue {} is finished, schedule a cleanup", this);
+ return true;
+ }
+
+ return false;
+ }
+
OfHeader flushEntry() {
for (;;) {
// No message ready
private boolean barrierTimerEnabled;
private int nonBarrierMessages;
private long lastXid = 0;
+ private Integer shutdownOffset;
// Passed to executor to request triggering of flush
private final Runnable flushRunnable = new Runnable() {
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
barrierTimerEnabled = false;
- if (currentQueue == null) {
+ if (shutdownOffset != null) {
LOG.trace("Channel shut down, not processing barrier");
return;
}
}
}
+ private void rescheduleFlush() {
+ /*
+ * We are almost ready to terminate. This is a bit tricky, because
+ * we do not want to have a race window where a message would be
+ * stuck on the queue without a flush being scheduled.
+ *
+ * So we mark ourselves as not running and then re-check if a
+ * flush out is needed. That will re-synchronized with other threads
+ * such that only one flush is scheduled at any given time.
+ */
+ if (!flushScheduled.compareAndSet(true, false)) {
+ LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+ }
+
+ conditionalFlush();
+ }
+
+ private void shutdownFlush() {
+ long entries = 0;
+
+ // Fail all queues
+ final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl queue = it.next();
+
+ entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (queue.isFinished()) {
+ LOG.trace("Cleared queue {}", queue);
+ it.remove();
+ }
+ }
+
+ LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+ Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+ if (currentQueue.isShutdown(shutdownOffset)) {
+ currentQueue = null;
+ handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ } else {
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
+ }
+ }
+
/**
* Perform a single flush operation.
*/
protected void flush() {
// If the channel is gone, just flush whatever is not completed
- if (currentQueue == null) {
- long entries = 0;
-
- final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
- while (it.hasNext()) {
- final OutboundQueueImpl queue = it.next();
- entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
- if (queue.isFinished()) {
- LOG.trace("Cleared queue {}", queue);
- it.remove();
- }
- }
-
- LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+ if (shutdownOffset != null) {
+ shutdownFlush();
return;
}
LOG.debug("Flushed {} messages in {}us to channel {}",
messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
- /*
- * We are almost ready to terminate. This is a bit tricky, because
- * we do not want to have a race window where a message would be
- * stuck on the queue without a flush being scheduled.
- *
- * So we mark ourselves as not running and then re-check if a
- * flush out is needed. That will re-synchronized with other threads
- * such that only one flush is scheduled at any given time.
- */
- if (!flushScheduled.compareAndSet(true, false)) {
- LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
- }
-
- conditionalFlush();
+ rescheduleFlush();
}
/**
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
- if (!currentQueue.isEmpty()) {
+ if (currentQueue.needsFlush()) {
scheduleFlush();
} else {
LOG.trace("Queue is empty, no flush needed");
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel());
- handler.onConnectionQueueChanged(null);
- currentQueue = null;
- queueCache.clear();
+ LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
+ /*
+ * We are dealing with a multi-threaded shutdown, as the user may still
+ * be reserving entries in the queue. We are executing in a netty thread,
+ * so neither flush nor barrier can be running, which is good news.
+ *
+ * We will eat up all the slots in the queue here and mark the offset first
+ * reserved offset and free up all the cached queues. We then schedule
+ * the flush task, which will deal with the rest of the shutdown process.
+ */
+ shutdownOffset = currentQueue.startShutdown();
+ queueCache.clear();
+ LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
scheduleFlush();
}