return entries;
}
- abstract boolean pairRequest(final OfHeader message);
+ boolean pairRequest(final OfHeader message) {
+ Iterator<StackedSegment> it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment queue = it.next();
+ final OutboundQueueEntry entry = queue.pairRequest(message);
+ if (entry == null) {
+ continue;
+ }
+
+ LOG.trace("Queue {} accepted response {}", queue, message);
+
+ // This has been a barrier request, we need to flush all
+ // previous queues
+ if (entry.isBarrier() && uncompletedSegments.size() > 1) {
+ LOG.trace("Queue {} indicated request was a barrier", queue);
+
+ it = uncompletedSegments.iterator();
+ while (it.hasNext()) {
+ final StackedSegment q = it.next();
+
+ // We want to complete all queues before the current one, we will
+ // complete the current queue below
+ if (!queue.equals(q)) {
+ LOG.trace("Queue {} is implied finished", q);
+ q.completeAll();
+ it.remove();
+ q.recycle();
+ } else {
+ break;
+ }
+ }
+ }
+
+ if (queue.isComplete()) {
+ LOG.trace("Queue {} is finished", queue);
+ it.remove();
+ queue.recycle();
+ }
+
+ return true;
+ }
+
+ LOG.debug("Failed to find completion for message {}", message);
+ return false;
+ }
boolean needsFlush() {
// flushOffset always points to the first entry, which can be changed only
return !needsFlush();
}
+ protected OutboundQueueEntry getEntry(final Long xid) {
+ final StackedSegment fastSegment = firstSegment;
+ final long calcOffset = xid - fastSegment.getBaseXid();
+ Preconditions.checkArgument(calcOffset >= 0, "Commit of XID %s does not match up with base XID %s", xid, fastSegment.getBaseXid());
+
+ Verify.verify(calcOffset <= Integer.MAX_VALUE);
+ final int fastOffset = (int) calcOffset;
+
+ if (fastOffset >= StackedSegment.SEGMENT_SIZE) {
+ LOG.debug("Queue {} falling back to slow commit of XID {} at offset {}", this, xid, fastOffset);
+
+ final StackedSegment segment;
+ final int slowOffset;
+ synchronized (unflushedSegments) {
+ final StackedSegment slowSegment = firstSegment;
+ final long slowCalcOffset = xid - slowSegment.getBaseXid();
+ Verify.verify(slowCalcOffset >= 0 && slowCalcOffset <= Integer.MAX_VALUE);
+ slowOffset = (int) slowCalcOffset;
+
+ LOG.debug("Queue {} recalculated offset of XID {} to {}", this, xid, slowOffset);
+ segment = unflushedSegments.get(slowOffset / StackedSegment.SEGMENT_SIZE);
+ }
+
+ final int segOffset = slowOffset % StackedSegment.SEGMENT_SIZE;
+ LOG.debug("Queue {} slow commit of XID {} completed at offset {} (segment {} offset {})", this, xid, slowOffset, segment, segOffset);
+ return segment.getEntry(segOffset);
+ }
+ return fastSegment.getEntry(fastOffset);
+ }
+
@GuardedBy("unflushedSegments")
private long lockedShutdownFlush() {
long entries = 0;