private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
+ private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
private static final long FLUSH_RETRY_NANOS = 1L;
private final OutboundQueueManager<?> manager;
private final OutboundQueueEntry[] queue;
private final int reserve;
// Updated concurrently
- private volatile int reserveOffset;
+ private volatile int barrierOffset = -1;
+ private volatile int reserveOffset = 0;
// Updated from Netty only
private int flushOffset;
return new OutboundQueueImpl(manager, baseXid, queue);
}
- Long reserveEntry(final boolean forBarrier) {
+ @Override
+ public Long reserveEntry() {
+ return reserveEntry(false);
+ }
+
+ @Override
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ final int offset = (int)(xid - baseXid);
+ if (message != null) {
+ Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+ }
+
+ final OutboundQueueEntry entry = queue[offset];
+ entry.commit(message, callback);
+ LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+
+ if (entry.isBarrier()) {
+ int my = offset;
+ for (;;) {
+ final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
+ if (prev < my) {
+ LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
+ break;
+ }
+
+ // We have traveled back, recover
+ my = prev;
+ }
+ }
+
+ manager.ensureFlushing(this);
+ }
+
+ private Long reserveEntry(final boolean forBarrier) {
final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
if (offset >= reserve) {
if (forBarrier) {
return xid;
}
- @Override
- public Long reserveEntry() {
- return reserveEntry(false);
- }
-
- @Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
- final int offset = (int)(xid - baseXid);
- if (message != null) {
- Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+ Long reserveBarrierIfNeeded() {
+ final int bo = barrierOffset;
+ if (bo >= flushOffset) {
+ LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
+ return null;
+ } else {
+ return reserveEntry(true);
}
-
- queue[offset].commit(message, callback);
- LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
-
- manager.ensureFlushing(this);
}
/**