long messages = 0;
for (;; ++messages) {
if (!parent.getChannel().isWritable()) {
- LOG.trace("Channel is no longer writable");
+ LOG.debug("Channel {} is no longer writable", parent.getChannel());
break;
}
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
- if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
- scheduleFlush();
+ if (currentQueue.needsFlush()) {
+ if (shutdownOffset != null || parent.getChannel().isWritable()) {
+ scheduleFlush();
+ } else {
+ LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+ }
} else {
LOG.trace("Queue is empty, no flush needed");
}
@Override
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
- conditionalFlush(ctx);
+
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+ flush();
+ } else {
+ LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+ }
}
@Override