private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
private final OutboundQueueManager<?> manager;
- private volatile long lastXid = -1;
+ private volatile long allocatedXid = -1;
private volatile long barrierXid = -1;
+ private volatile long lastXid = -1;
@GuardedBy("unflushedSegments")
private Integer shutdownOffset;
LOG.debug("Adding segment {}", newSegment);
unflushedSegments.add(newSegment);
}
+
+ allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid();
}
/*
final StackedSegment fastSegment = firstSegment;
if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
- LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
+ if (xid >= allocatedXid) {
+ // Multiple segments, this a slow path
+ LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
- // Multiple segments, this a slow path
- synchronized (unflushedSegments) {
- LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
+ synchronized (unflushedSegments) {
+ LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
- // Shutdown was scheduled, need to fail the reservation
- if (shutdownOffset != null) {
- LOG.debug("Queue {} is being shutdown, failing reservation", this);
- return null;
- }
+ // Shutdown was scheduled, need to fail the reservation
+ if (shutdownOffset != null) {
+ LOG.debug("Queue {} is being shutdown, failing reservation", this);
+ return null;
+ }
- // Ensure we have the appropriate segment for the specified XID
- final StackedSegment slowSegment = firstSegment;
- final int slowOffset = (int) (xid - slowSegment.getBaseXid());
- Verify.verify(slowOffset >= 0);
+ // Ensure we have the appropriate segment for the specified XID
+ final StackedSegment slowSegment = firstSegment;
+ final int slowOffset = (int) (xid - slowSegment.getBaseXid());
+ Verify.verify(slowOffset >= 0);
- // Now, we let's see if we need to allocate a new segment
- ensureSegment(slowSegment, slowOffset);
+ // Now, we let's see if we need to allocate a new segment
+ ensureSegment(slowSegment, slowOffset);
- LOG.debug("Queue {} slow reservation finished", this);
+ LOG.debug("Queue {} slow reservation finished", this);
+ }
+ } else {
+ LOG.debug("Queue {} XID {} is already backed", this, xid);
}
}