final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
- return lockedShutdownFlush();
+ // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
+ return lockedFailSegments(uncompletedSegments.iterator());
}
}
+ /**
+ * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
+ * and fails all not completed entries (if in final phase)
+ * @return true if in final phase, false if a flush is needed
+ */
boolean finishShutdown() {
+ boolean needsFlush;
synchronized (unflushedSegments) {
- lockedShutdownFlush();
+ // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
+ // - they will never be completed due to disconnected channel.
+ lockedFailSegments(uncompletedSegments.iterator());
+ // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
+ // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
+ // by this time.
+ needsFlush = needsFlush();
+ if (!needsFlush) {
+ lockedFailSegments(unflushedSegments.iterator());
+ }
}
-
- return !needsFlush();
+ return !needsFlush;
}
protected OutboundQueueEntry getEntry(final Long xid) {
return fastSegment.getEntry(fastOffset);
}
+ /**
+ * Fails not completed entries in segments and frees completed segments
+ * @param iterator list of segments to be failed
+ * @return number of failed entries
+ */
@GuardedBy("unflushedSegments")
- private long lockedShutdownFlush() {
+ private long lockedFailSegments(Iterator<StackedSegment> iterator) {
long entries = 0;
// Fail all queues
- final Iterator<StackedSegment> it = uncompletedSegments.iterator();
- while (it.hasNext()) {
- final StackedSegment segment = it.next();
+ while (iterator.hasNext()) {
+ final StackedSegment segment = iterator.next();
entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
if (segment.isComplete()) {
LOG.trace("Cleared segment {}", segment);
- it.remove();
+ iterator.remove();
}
}
return entries;
}
+
}