this.baseXid = baseXid;
this.endXid = baseXid + queue.length;
this.reserve = queue.length - 1;
+ }
+
+ void retire() {
for (OutboundQueueEntry element : queue) {
element.reset();
}
Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
}
+ final int ro = reserveOffset;
+ Preconditions.checkArgument(offset < ro, "Unexpected commit to offset %s reserved %s message %s", offset, ro, message);
+
final OutboundQueueEntry entry = queue[offset];
entry.commit(message, callback);
- LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+ LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, ro);
if (entry.isBarrier()) {
int my = offset;
}
}
+ int startShutdown() {
+ // Increment the offset by the queue size, hence preventing any normal
+ // allocations. We should not be seeing a barrier reservation after this
+ // and if there is one issued, we can disregard it.
+ final int offset = CURRENT_OFFSET_UPDATER.getAndAdd(this, queue.length);
+
+ // If this offset is larger than reserve, trim it. That is not an accurate
+ // view of which slot was actually "reserved", but it indicates at which
+ // entry we can declare the queue flushed (e.g. at the emergency slot).
+ return offset > reserve ? reserve : offset;
+ }
+
+ boolean isShutdown(final int offset) {
+ // This queue is shutdown if the flushOffset (e.g. the next entry to
+ // be flushed) points to the offset 'reserved' in startShutdown()
+ return flushOffset >= offset;
+ }
+
/**
* An empty queue is a queue which has no further unflushed entries.
*
* @return True if this queue does not have unprocessed entries.
*/
- boolean isEmpty() {
+ private boolean isEmpty() {
int ro = reserveOffset;
if (ro >= reserve) {
if (queue[reserve].isCommitted()) {
}
boolean isFlushed() {
- LOG.debug("Check queue {} for completeness (offset {}, reserve {})", flushOffset, reserve);
+ LOG.debug("Check queue {} for completeness (offset {}, reserve {})", this, flushOffset, reserve);
if (flushOffset < reserve) {
return false;
}
return flushOffset >= queue.length || !queue[reserve].isCommitted();
}
+ boolean needsFlush() {
+ if (flushOffset < reserve) {
+ return queue[flushOffset].isCommitted();
+ }
+
+ if (isFlushed()) {
+ LOG.trace("Queue {} is flushed, schedule a replace", this);
+ return true;
+ }
+ if (isFinished()) {
+ LOG.trace("Queue {} is finished, schedule a cleanup", this);
+ return true;
+ }
+
+ return false;
+ }
+
OfHeader flushEntry() {
for (;;) {
// No message ready
if (isEmpty()) {
- LOG.trace("Flushed all reserved entries up to ", flushOffset);
+ LOG.trace("Flushed all reserved entries up to {}", flushOffset);
return null;
}
return null;
}
- final OfHeader msg = entry.getMessage();
+ final OfHeader msg = entry.takeMessage();
flushOffset++;
if (msg != null) {
return msg;