return firstSegment.getEntry(flushOffset).isCommitted();
}
- long startShutdown(final Channel channel) {
+ long startShutdown() {
/*
* We are dealing with a multi-threaded shutdown, as the user may still
* be reserving entries in the queue. We are executing in a netty thread,
/**
* Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
* and fails all not completed entries (if in final phase)
+ * @param channel netty channel
* @return true if in final phase, false if a flush is needed
*/
- boolean finishShutdown() {
+ boolean finishShutdown(final Channel channel) {
boolean needsFlush;
synchronized (unflushedSegments) {
// Fails all entries, that were flushed in shutdownOffset (became uncompleted)
// - they will never be completed due to disconnected channel.
lockedFailSegments(uncompletedSegments.iterator());
- // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
- // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
- // by this time.
- needsFlush = needsFlush();
+ // If no further flush is needed or we are not able to write to channel anymore, then we fail all unflushed
+ // segments, so that each enqueued entry is reported as unsuccessful due to channel disconnection.
+ // No further entries should be enqueued by this time.
+ needsFlush = channel.isWritable() && needsFlush();
if (!needsFlush) {
lockedFailSegments(unflushedSegments.iterator());
}