final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
- // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
- return lockedFailSegments(uncompletedSegments.iterator());
+ return lockedShutdownFlush();
}
}
- /**
- * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
- * and fails all not completed entries (if in final phase)
- * @return true if in final phase, false if a flush is needed
- */
boolean finishShutdown() {
- boolean needsFlush;
synchronized (unflushedSegments) {
- // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
- // - they will never be completed due to disconnected channel.
- lockedFailSegments(uncompletedSegments.iterator());
- // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
- // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
- // by this time.
- needsFlush = needsFlush();
- if (!needsFlush) {
- lockedFailSegments(unflushedSegments.iterator());
- }
+ lockedShutdownFlush();
}
- return !needsFlush;
+
+ return !needsFlush();
}
protected OutboundQueueEntry getEntry(final Long xid) {
return fastSegment.getEntry(fastOffset);
}
- /**
- * Fails not completed entries in segments and frees completed segments
- * @param iterator list of segments to be failed
- * @return number of failed entries
- */
@GuardedBy("unflushedSegments")
- private long lockedFailSegments(Iterator<StackedSegment> iterator) {
+ private long lockedShutdownFlush() {
long entries = 0;
// Fail all queues
- while (iterator.hasNext()) {
- final StackedSegment segment = iterator.next();
+ final Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment segment = it.next();
entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
if (segment.isComplete()) {
LOG.trace("Cleared segment {}", segment);
- iterator.remove();
+ it.remove();
}
}
return entries;
}
-
}