return;
}
- final long now = System.nanoTime();
- final long sinceLast = now - lastBarrierNanos;
- if (sinceLast >= maxBarrierNanos) {
- LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
- // FIXME: we should be tracking requests/responses instead of this
- if (nonBarrierMessages == 0) {
- LOG.trace("No messages written since last barrier, not issuing one");
- } else {
- scheduleBarrierMessage();
- }
+ if (currentQueue.isBarrierNeeded()) {
+ LOG.trace("Sending a barrier message");
+ scheduleBarrierMessage();
+ } else {
+ LOG.trace("Barrier not needed, not issuing one");
}
}
}
Long reserveBarrierIfNeeded() {
+ if (isBarrierNeeded()) {
+ return reserveEntry();
+ }
+ return null;
+ }
+
+ /**
+ * Checks if Barrier Request is the last message enqueued. If not, one needs
+ * to be scheduled in order to collect data about previous messages.
+ * @return true if last enqueued message is Barrier Request, false otherwise
+ */
+ boolean isBarrierNeeded() {
final long bXid = barrierXid;
final long fXid = firstSegment.getBaseXid() + flushOffset;
if (bXid >= fXid) {
LOG.debug("Barrier found at XID {} (currently at {})", bXid, fXid);
- return null;
+ return false;
}
- return reserveEntry();
+ return true;
}
}