flush();
}
};
+
+ // Passed to executor to request a periodic barrier check
private final Runnable barrierRunnable = new Runnable() {
@Override
public void run() {
}
private void scheduleFlush() {
- if (parent.getChannel().isWritable()) {
- if (flushScheduled.compareAndSet(false, true)) {
- LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
- parent.getChannel().eventLoop().execute(flushRunnable);
- } else {
- LOG.trace("Flush task is already present on channel {}", parent.getChannel());
- }
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+ parent.getChannel().eventLoop().execute(flushRunnable);
} else {
- LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+ LOG.trace("Flush task is already present on channel {}", parent.getChannel());
}
}
* Perform a single flush operation.
*/
protected void flush() {
+ // If the channel is gone, just flush whatever is not completed
+ if (currentQueue == null) {
+ long entries = 0;
+
+ final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl queue = it.next();
+ entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (queue.isFinished()) {
+ LOG.trace("Cleared queue {}", queue);
+ it.remove();
+ }
+ }
+
+ LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+ return;
+ }
+
final long start = System.nanoTime();
final long deadline = start + maxWorkTime;
conditionalFlush();
}
-
/**
* Schedule a queue flush if it is not empty and the channel is found
* to be writable. May only be called from Netty context.
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- long entries = 0;
- LOG.debug("Channel shutdown, flushing queue...");
+ LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel());
handler.onConnectionQueueChanged(null);
+ currentQueue = null;
+ queueCache.clear();
- for (OutboundQueueImpl queue : activeQueues) {
- entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
- }
- activeQueues.clear();
-
- LOG.debug("Flushed {} queue entries", entries);
+ scheduleFlush();
}
@Override