-
- long startShutdown(final Channel channel) {
- /*
- * We are dealing with a multi-threaded shutdown, as the user may still
- * be reserving entries in the queue. We are executing in a netty thread,
- * so neither flush nor barrier can be running, which is good news.
- *
- * We will eat up all the slots in the queue here and mark the offset first
- * reserved offset and free up all the cached queues. We then schedule
- * the flush task, which will deal with the rest of the shutdown process.
- */
- synchronized (unflushedSegments) {
- // Increment the offset by the segment size, preventing fast path allocations,
- // since we are holding the slow path lock, any reservations will see the queue
- // in shutdown and fail accordingly.
- final long xid = LAST_XID_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
- shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
-
- return lockedShutdownFlush();
- }
- }
-
- @GuardedBy("unflushedSegments")
- private long lockedShutdownFlush() {
- long entries = 0;
-
- // Fail all queues
- final Iterator<StackedSegment> it = uncompletedSegments.iterator();
- while (it.hasNext()) {
- final StackedSegment segment = it.next();
-
- entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
- if (segment.isComplete()) {
- LOG.trace("Cleared segment {}", segment);
- it.remove();
- }
- }
-
- return entries;
- }
-
- boolean finishShutdown() {
- synchronized (unflushedSegments) {
- lockedShutdownFlush();
- }
-
- return !needsFlush();
- }
-
- boolean needsFlush() {
- // flushOffset always points to the first entry, which can be changed only
- // from Netty, so we are fine here.
- if (firstSegment.getBaseXid() + flushOffset > lastXid) {
- return false;
- }
-
- if (shutdownOffset != null && flushOffset >= shutdownOffset) {
- return false;
- }
-
- return firstSegment.getEntry(flushOffset).isCommitted();
- }