unflushedSegments.add(newSegment);
}
- allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid();
+ allocatedXid = unflushedSegments.get(unflushedSegments.size() - 1).getEndXid();
}
/*
return entries;
}
- abstract boolean pairRequest(final OfHeader message);
+ boolean pairRequest(final OfHeader message) {
+ Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment queue = it.next();
+ final OutboundQueueEntry entry = queue.pairRequest(message);
+ if (entry == null) {
+ continue;
+ }
+
+ LOG.trace("Queue {} accepted response {}", queue, message);
+
+ // This has been a barrier request, we need to flush all
+ // previous queues
+ if (entry.isBarrier() && uncompletedSegments.size() > 1) {
+ LOG.trace("Queue {} indicated request was a barrier", queue);
+
+ it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment q = it.next();
+
+ // We want to complete all queues before the current one, we will
+ // complete the current queue below
+ if (!queue.equals(q)) {
+ LOG.trace("Queue {} is implied finished", q);
+ q.completeAll();
+ it.remove();
+ q.recycle();
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (queue.isComplete()) {
+ LOG.trace("Queue {} is finished", queue);
+ it.remove();
+ queue.recycle();
+ }
+
+ return true;
+ }
+
+ LOG.debug("Failed to find completion for message {}", message);
+ return false;
+ }
boolean needsFlush() {
// flushOffset always points to the first entry, which can be changed only
final long xid = LAST_XID_OFFSET_UPDATER.addAndGet(this, StackedSegment.SEGMENT_SIZE);
shutdownOffset = (int) (xid - firstSegment.getBaseXid() - StackedSegment.SEGMENT_SIZE);
- return lockedShutdownFlush();
+ // Fails all uncompleted entries, because they will never be completed due to disconnected channel.
+ return lockedFailSegments(uncompletedSegments.iterator());
}
}
+ /**
+ * Checks if the shutdown is in final phase -> all allowed entries (number of entries < shutdownOffset) are flushed
+ * and fails all not completed entries (if in final phase)
+ * @return true if in final phase, false if a flush is needed
+ */
boolean finishShutdown() {
+ boolean needsFlush;
synchronized (unflushedSegments) {
- lockedShutdownFlush();
+ // Fails all entries, that were flushed in shutdownOffset (became uncompleted)
+ // - they will never be completed due to disconnected channel.
+ lockedFailSegments(uncompletedSegments.iterator());
+ // If no further flush is needed, than we fail all unflushed segments, so that each enqueued entry
+ // is reported as unsuccessful due to channel disconnection. No further entries should be enqueued
+ // by this time.
+ needsFlush = needsFlush();
+ if (!needsFlush) {
+ lockedFailSegments(unflushedSegments.iterator());
+ }
}
+ return !needsFlush;
+ }
+
+ protected OutboundQueueEntry getEntry(final Long xid) {
+ final StackedSegment fastSegment = firstSegment;
+ final long calcOffset = xid - fastSegment.getBaseXid();
+ Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
- return !needsFlush();
+ Verify.verify(calcOffset <= Integer.MAX_VALUE);
+ final int fastOffset = (int) calcOffset;
+
+ if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
+ LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
+
+ final StackedSegment segment;
+ final int slowOffset;
+ synchronized (unflushedSegments) {
+ final StackedSegment slowSegment = firstSegment;
+ final long slowCalcOffset = xid - slowSegment.getBaseXid();
+ Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
+ slowOffset = (int) slowCalcOffset;
+
+ LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
+ segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
+ }
+
+ final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
+ LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+ return segment.getEntry(segOffset);
+ }
+ return fastSegment.getEntry(fastOffset);
}
+ /**
+ * Fails not completed entries in segments and frees completed segments
+ * @param iterator list of segments to be failed
+ * @return number of failed entries
+ */
@GuardedBy("unflushedSegments")
- private long lockedShutdownFlush() {
+ private long lockedFailSegments(Iterator<StackedSegment> iterator) {
long entries = 0;
// Fail all queues
- final Iterator<StackedSegment> it = uncompletedSegments.iterator();
- while (it.hasNext()) {
- final StackedSegment segment = it.next();
+ while (iterator.hasNext()) {
+ final StackedSegment segment = iterator.next();
entries += segment.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
if (segment.isComplete()) {
LOG.trace("Cleared segment {}", segment);
- it.remove();
+ iterator.remove();
}
}
return entries;
}
+
}