In the case of multiple segments being present, we do not need to
completely synchronize if the corresponding segment has already been
allocated. Communicate this fact when allocating segments and check this
flag before embarking on the entire synchronized affair during
reserve().
Change-Id: I10a9ecb141f619b94a20d74152124134d36591b1
Signed-off-by: Robert Varga <rovarga@cisco.com>
private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
private final OutboundQueueManager<?> manager;
private final List<StackedSegment> uncompletedSegments = new ArrayList<>(2);
private final OutboundQueueManager<?> manager;
- private volatile long lastXid = -1;
+ private volatile long allocatedXid = -1;
private volatile long barrierXid = -1;
private volatile long barrierXid = -1;
+ private volatile long lastXid = -1;
@GuardedBy("unflushedSegments")
private Integer shutdownOffset;
@GuardedBy("unflushedSegments")
private Integer shutdownOffset;
LOG.debug("Adding segment {}", newSegment);
unflushedSegments.add(newSegment);
}
LOG.debug("Adding segment {}", newSegment);
unflushedSegments.add(newSegment);
}
+
+ allocatedXid = uncompletedSegments.get(uncompletedSegments.size() - 1).getEndXid();
final StackedSegment fastSegment = firstSegment;
if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
final StackedSegment fastSegment = firstSegment;
if (xid >= fastSegment.getBaseXid() + StackedSegment.SEGMENT_SIZE) {
- LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
+ if (xid >= allocatedXid) {
+ // Multiple segments, this a slow path
+ LOG.debug("Queue {} falling back to slow reservation for XID {}", this, xid);
- // Multiple segments, this a slow path
- synchronized (unflushedSegments) {
- LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
+ synchronized (unflushedSegments) {
+ LOG.debug("Queue {} executing slow reservation for XID {}", this, xid);
- // Shutdown was scheduled, need to fail the reservation
- if (shutdownOffset != null) {
- LOG.debug("Queue {} is being shutdown, failing reservation", this);
- return null;
- }
+ // Shutdown was scheduled, need to fail the reservation
+ if (shutdownOffset != null) {
+ LOG.debug("Queue {} is being shutdown, failing reservation", this);
+ return null;
+ }
- // Ensure we have the appropriate segment for the specified XID
- final StackedSegment slowSegment = firstSegment;
- final int slowOffset = (int) (xid - slowSegment.getBaseXid());
- Verify.verify(slowOffset >= 0);
+ // Ensure we have the appropriate segment for the specified XID
+ final StackedSegment slowSegment = firstSegment;
+ final int slowOffset = (int) (xid - slowSegment.getBaseXid());
+ Verify.verify(slowOffset >= 0);
- // Now, we let's see if we need to allocate a new segment
- ensureSegment(slowSegment, slowOffset);
+ // Now, we let's see if we need to allocate a new segment
+ ensureSegment(slowSegment, slowOffset);
- LOG.debug("Queue {} slow reservation finished", this);
+ LOG.debug("Queue {} slow reservation finished", this);
+ }
+ } else {
+ LOG.debug("Queue {} XID {} is already backed", this, xid);
+ long getEndXid() {
+ return endXid;
+ }
+
OutboundQueueEntry getEntry(final int offset) {
return entries[offset];
}
OutboundQueueEntry getEntry(final int offset) {
return entries[offset];
}