// we'll steal its work. Note that more work may accumulate in the time window
// between now and when the task will run, so it may not be a no-op after all.
//
- // The reason for this is to will the output buffer before we go into selection
+ // The reason for this is to fill the output buffer before we go into selection
// phase. This will make sure the pipe is full (in which case our next wake up
// will be the queue becoming writable).
writeAndFlush();
+ alreadyReading = false;
}
@Override
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
+ // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
+ // in the queue.
super.channelInactive(ctx);
LOG.debug("Channel {} initiating shutdown...", ctx.channel());
+ // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
+ // indefinitely) and failing not completed entries.
shuttingDown = true;
final long entries = currentQueue.startShutdown(ctx.channel());
LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
+ // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
+ // when there is more than shutdownOffset messages enqueued in unflushed segments
+ // (AbstractStackedOutboundQueue#finishShutdown()).
scheduleFlush();
}
*
* @return
*/
- private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
+ protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
Preconditions.checkArgument(msg != null);
if (address == null) {
LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
writeAndFlush();
rescheduleFlush();
- } else if (currentQueue.finishShutdown()) {
- close();
- LOG.debug("Channel {} shutdown complete", parent.getChannel());
} else {
- LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
- rescheduleFlush();
+ close();
+ if (currentQueue.finishShutdown()) {
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ } else {
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
+ }
}
}