import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
private static final int WORKTIME_RECHECK_MSGS = 64;
/**
- * We maintain a cache of this many previous queues for later reuse.
+ * Default low write watermark. Channel will become writable when number of outstanding
+ * bytes dips below this value.
*/
- private static final int QUEUE_CACHE_SIZE = 4;
+ private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
+
+ /**
+ * Default write high watermark. Channel will become un-writable when number of
+ * outstanding bytes hits this value.
+ */
+ private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
+
- private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
private final AtomicBoolean flushScheduled = new AtomicBoolean();
private final ConnectionAdapterImpl parent;
private final InetSocketAddress address;
private final long maxBarrierNanos;
private final long maxWorkTime;
- private final int queueSize;
private final T handler;
// Updated from netty only
private long lastBarrierNanos = System.nanoTime();
+ private OutboundQueueCacheSlice slice;
private OutboundQueueImpl currentQueue;
private boolean barrierTimerEnabled;
private int nonBarrierMessages;
};
OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
- final int queueSize, final long maxBarrierNanos) {
+ final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
this.parent = Preconditions.checkNotNull(parent);
this.handler = Preconditions.checkNotNull(handler);
- Preconditions.checkArgument(queueSize > 0);
- this.queueSize = queueSize;
+ this.slice = Preconditions.checkNotNull(slice);
Preconditions.checkArgument(maxBarrierNanos > 0);
this.maxBarrierNanos = maxBarrierNanos;
this.address = address;
this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
- LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+ LOG.debug("Queue manager instantiated with queue slice {}", slice);
createQueue();
}
@Override
public void close() {
handler.onConnectionQueueChanged(null);
- }
-
- private void retireQueue(final OutboundQueueImpl queue) {
- if (queueCache.offer(queue)) {
- LOG.trace("Saving queue {} for later reuse", queue);
- } else {
- LOG.trace("Queue {} thrown away", queue);
+ if (slice != null) {
+ slice.decRef();
+ slice = null;
}
}
private void createQueue() {
final long baseXid = lastXid;
- lastXid += queueSize + 1;
-
- final OutboundQueueImpl cached = queueCache.poll();
- final OutboundQueueImpl queue;
- if (cached != null) {
- queue = cached.reuse(baseXid);
- LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
- } else {
- queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
- LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
- }
+ lastXid += slice.getQueueSize() + 1;
+ final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
activeQueues.add(queue);
currentQueue = queue;
handler.onConnectionQueueChanged(queue);
lastBarrierNanos = now;
} else {
nonBarrierMessages++;
- if (nonBarrierMessages >= queueSize) {
+ if (nonBarrierMessages >= slice.getQueueSize()) {
LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
scheduleBarrierMessage();
} else if (!barrierTimerEnabled) {
LOG.trace("Queue {} is implied finished", q);
q.completeAll();
it.remove();
- retireQueue(q);
+ slice.putQueue(q);
} else {
break;
}
if (queue.isFinished()) {
LOG.trace("Queue {} is finished", queue);
it.remove();
- retireQueue(queue);
+ slice.putQueue(queue);
}
return true;
}
/**
- * Perform a single flush operation.
+ * Perform a single flush operation. We keep it here so we do not generate
+ * syntetic accessors for private fields. Otherwise it could be moved into
+ * {@link #flushRunnable}.
*/
protected void flush() {
// If the channel is gone, just flush whatever is not completed
long messages = 0;
for (;; ++messages) {
if (!parent.getChannel().isWritable()) {
- LOG.trace("Channel is no longer writable");
+ LOG.debug("Channel {} is no longer writable", parent.getChannel());
break;
}
*/
private void conditionalFlush() {
if (currentQueue.needsFlush()) {
- scheduleFlush();
+ if (shutdownOffset != null || parent.getChannel().isWritable()) {
+ scheduleFlush();
+ } else {
+ LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+ }
} else {
LOG.trace("Queue is empty, no flush needed");
}
conditionalFlush(ctx);
}
+ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+ /*
+ * Tune channel write buffering. We increase the writability window
+ * to ensure we can flush an entire queue segment in one go. We definitely
+ * want to keep the difference above 64k, as that will ensure we use jam-packed
+ * TCP packets. UDP will fragment as appropriate.
+ */
+ ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
+ ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
+
+ super.handlerAdded(ctx);
+ }
+
@Override
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
- conditionalFlush(ctx);
+
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+ flush();
+ } else {
+ LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+ }
}
@Override
* the flush task, which will deal with the rest of the shutdown process.
*/
shutdownOffset = currentQueue.startShutdown();
- queueCache.clear();
+ if (slice != null) {
+ slice.decRef();
+ slice = null;
+ }
+
LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
scheduleFlush();
}