import org.slf4j.LoggerFactory;
final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
+ private static enum PipelineState {
+ /**
+ * Netty thread is potentially idle, no assumptions
+ * can be made about its state.
+ */
+ IDLE,
+ /**
+ * Netty thread is currently reading, once the read completes,
+ * if will flush the queue in the {@link #FLUSHING} state.
+ */
+ READING,
+ /**
+ * Netty thread is currently performing a flush on the queue.
+ * It will then transition to {@link #IDLE} state.
+ */
+ WRITING,
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
/**
private final T handler;
// Accessed concurrently
- private volatile boolean reading;
+ private volatile PipelineState state = PipelineState.IDLE;
// Updated from netty only
private boolean alreadyReading;
}
private void writeAndFlush() {
+ state = PipelineState.WRITING;
+
final long start = System.nanoTime();
final int entries = currentQueue.writeEntries(parent.getChannel(), start);
if (entries > 0) {
- LOG.debug("Flushing channel {}", parent.getChannel());
+ LOG.trace("Flushing channel {}", parent.getChannel());
parent.getChannel().flush();
}
LOG.debug("Flushed {} messages to channel {} in {}us", entries,
parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
}
+
+ state = PipelineState.IDLE;
}
/**
protected void flush() {
// If the channel is gone, just flush whatever is not completed
if (!shuttingDown) {
- LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
+ LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
writeAndFlush();
rescheduleFlush();
} else if (currentQueue.finishShutdown()) {
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
- // A simple trade-off. While we could write things right away, if there is a task
- // schedule, let it have the work
- if (flushScheduled.compareAndSet(false, true)) {
- LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
- flush();
- } else {
- LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
- }
+ // The channel is writable again. There may be a flush task on the way, but let's
+ // steal its work, potentially decreasing latency. Since there is a window between
+ // now and when it will run, it may still pick up some more work to do.
+ LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+ writeAndFlush();
}
@Override
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
- // non-volatile read if we are called multiple times
+ // Netty does not provide a 'start reading' callback, so this is our first
+ // (and repeated) chance to detect reading. Since this callback can be invoked
+ // multiple times, we keep a boolean we check. That prevents a volatile write
+ // on repeated invocations. It will be cleared in channelReadComplete().
if (!alreadyReading) {
alreadyReading = true;
- reading = true;
+ state = PipelineState.READING;
}
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
- alreadyReading = false;
- reading = false;
-
- // TODO: model this as an atomic gate. We need to sync on it to make sure
- // that ensureFlushing() suppresses scheudling only if this barrier
- // has not been crossed.
- synchronized (this) {
- // Run flush regardless of writability. This is not strictly required, as
- // there may be a scheduled flush. Instead of canceling it, which is expensive,
- // we'll steal its work. Note that more work may accumulate in the time window
- // between now and when the task will run, so it may not be a no-op after all.
- //
- // The reason for this is to will the output buffer before we go into selection
- // phase. This will make sure the pipe is full (in which case our next wake up
- // will be the queue becoming writable).
- writeAndFlush();
- }
- LOG.debug("Opportunistic write on channel {}", parent.getChannel());
+ // Run flush regardless of writability. This is not strictly required, as
+ // there may be a scheduled flush. Instead of canceling it, which is expensive,
+ // we'll steal its work. Note that more work may accumulate in the time window
+ // between now and when the task will run, so it may not be a no-op after all.
+ //
+ // The reason for this is to will the output buffer before we go into selection
+ // phase. This will make sure the pipe is full (in which case our next wake up
+ // will be the queue becoming writable).
writeAndFlush();
}
// We are currently reading something, just a quick sync to ensure we will in fact
// flush state.
- if (reading) {
- synchronized (this) {
- if (reading) {
- return;
- }
- }
+ final PipelineState localState = state;
+ LOG.debug("Synchronize on pipeline state {}", localState);
+ switch (localState) {
+ case READING:
+ // Netty thread is currently reading, it will flush the pipeline once it
+ // finishes reading. This is a no-op situation.
+ break;
+ case WRITING:
+ case IDLE:
+ default:
+ // We cannot rely on the change being flushed, schedule a request
+ scheduleFlush();
}
-
- // Netty thread is outside our code, we need to schedule a flush
- // to re-synchronize.
- scheduleFlush();
}
void onEchoRequest(final EchoRequestMessage message) {