this.baseXid = baseXid;
this.endXid = baseXid + queue.length;
this.reserve = queue.length - 1;
+ }
+
+ void retire() {
for (OutboundQueueEntry element : queue) {
element.reset();
}
}
final int ro = reserveOffset;
- Preconditions.checkArgument(offset < ro, "Unexpected commit to offset {} reserved {} message {}", offset, ro, message);
+ Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
final OutboundQueueEntry entry = queue[offset];
entry.commit(message, callback);
return null;
}
- final OfHeader msg = entry.getMessage();
+ final OfHeader msg = entry.takeMessage();
flushOffset++;
if (msg != null) {
return msg;