this.baseXid = baseXid;
this.endXid = baseXid + queue.length;
this.reserve = queue.length - 1;
+ }
+
+ void retire() {
for (OutboundQueueEntry element : queue) {
element.reset();
}
}
- OutboundQueueImpl reuse(final long baseXid) {
+ OutboundQueueImpl reuse(final OutboundQueueManager<?> manager, final long baseXid) {
return new OutboundQueueImpl(manager, baseXid, queue);
}
Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
}
+ final int ro = reserveOffset;
+ Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
final OutboundQueueEntry entry = queue[offset];
entry.commit(message, callback);
- LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+ LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
if (entry.isBarrier()) {
int my = offset;
}
// We have traveled back, recover
+ LOG.debug("Queue {} retry pending barrier {} >= {}", this, prev, my);
my = prev;
}
}
}
boolean isFlushed() {
- LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+ LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
if (flushOffset < reserve) {
return false;
}
for (;;) {
// No message ready
if (isEmpty()) {
- LOG.trace("Flushed all reserved entries up to ", flushOffset);
+ LOG.trace("Flushed all reserved entries up to {}", flushOffset);
return null;
}
return null;
}
- final OfHeader msg = entry.getMessage();
+ final OfHeader msg = entry.takeMessage();
flushOffset++;
if (msg != null) {
return msg;