Instead of scheduling it on the executor, make sure we run the flush
task immediately.
Change-Id: Ic0cf9d031127d8e744a5cd79cfe440d52b3c3f0b
Signed-off-by: Robert Varga <rovarga@cisco.com>
}
// We have traveled back, recover
}
// We have traveled back, recover
+ LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
long messages = 0;
for (;; ++messages) {
if (!parent.getChannel().isWritable()) {
long messages = 0;
for (;; ++messages) {
if (!parent.getChannel().isWritable()) {
- LOG.trace("Channel is no longer writable");
+ LOG.debug("Channel {} is no longer writable", parent.getChannel());
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
- if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
- scheduleFlush();
+ if (currentQueue.needsFlush()) {
+ if (shutdownOffset != null || parent.getChannel().isWritable()) {
+ scheduleFlush();
+ } else {
+ LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+ }
} else {
LOG.trace("Queue is empty, no flush needed");
}
} else {
LOG.trace("Queue is empty, no flush needed");
}
@Override
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
@Override
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
+
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+ flush();
+ } else {
+ LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+ }