- sometimes (on disconnect) there can be still some unflushed segments
but they are not able to be flushed if channel is not writable anymore
and we get to infinite loop of flushing (but not writing)
Change-Id: I74cac21b4635e22f5b8d63f3a602f40796108059
Signed-off-by: Andrej Leitner <andrej.leitner@pantheon.tech>
// Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
// indefinitely) and failing not completed entries.
shuttingDown = true;
// Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
// indefinitely) and failing not completed entries.
shuttingDown = true;
- final long entries = currentQueue.startShutdown(ctx.channel());
+ final long entries = currentQueue.startShutdown();
LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
// Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
// Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
writeAndFlush();
rescheduleFlush();
LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
writeAndFlush();
rescheduleFlush();
- } else if (currentQueue.finishShutdown()) {
+ } else if (currentQueue.finishShutdown(parent.getChannel())) {
close();
LOG.debug("Channel {} shutdown complete", parent.getChannel());
} else {
close();
LOG.debug("Channel {} shutdown complete", parent.getChannel());
} else {
return firstSegment.getEntry(flushOffset).isCommitted();
}
return firstSegment.getEntry(flushOffset).isCommitted();
}
- long startShutdown(final Channel channel) {
/*
* We are dealing with a multi-threaded shutdown, as the user may still
* be reserving entries in the queue. We are executing in a netty thread,
/*
* We are dealing with a multi-threaded shutdown, as the user may still
* be reserving entries in the queue. We are executing in a netty thread,
/**
* Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
* and fails all not completed entries (if in final phase)
/**
* Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
* and fails all not completed entries (if in final phase)
+ * @param channel netty channel
* @return true if in final phase, false if a flush is needed
*/
* @return true if in final phase, false if a flush is needed
*/
- boolean finishShutdown() {
+ boolean finishShutdown(final Channel channel) {
boolean needsFlush;
synchronized (unflushedSegments) {
// Fails all entries, that were flushed in shutdownOffset (became uncompleted)
// - they will never be completed due to disconnected channel.
lockedFailSegments(uncompletedSegments.iterator());
boolean needsFlush;
synchronized (unflushedSegments) {
// Fails all entries, that were flushed in shutdownOffset (became uncompleted)
// - they will never be completed due to disconnected channel.
lockedFailSegments(uncompletedSegments.iterator());
- // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
- // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
- // by this time.
- needsFlush = needsFlush();
+ // If no further flush is needed or we are not able to write to channel anymore, then we fail all unflushed
+ // segments, so that each enqueued entry is reported as unsuccessful due to channel disconnection.
+ // No further entries should be enqueued by this time.
+ needsFlush = channel.isWritable() && needsFlush();
if (!needsFlush) {
lockedFailSegments(unflushedSegments.iterator());
}
if (!needsFlush) {
lockedFailSegments(unflushedSegments.iterator());
}