import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
conditionalFlush(ctx);
}
+ @Override
+ public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+
+ long entries = 0;
+ LOG.debug("Channel shutdown, flushing queue...");
+ final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
+ while (true) {
+ final MessageHolder<?> e = queue.poll();
+ if (e == null) {
+ break;
+ }
+
+ e.takeListener().operationComplete(result);
+ entries++;
+ }
+
+ LOG.debug("Flushed {} queue entries", entries);
+ }
+
@Override
public String toString() {
return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);