Original code did a full queue scan up to the current request. This
is inefficient if there are previously-completed barriers, as we end up
checking the same slots multiple times.
Remember the offset of last completed future and only flush slots from
that offset on subsequent barrier completion.
Change-Id: I9715f9f7818de611c01d0cd2eaa7637ac5372e91
Signed-off-by: Robert Varga <rovarga@cisco.com>
(cherry picked from commit
7da0dcda4b5d9e5c4510ceb8e4d7840696fbdd90)
// Updated from Netty only
private int flushOffset;
private int completeCount;
// Updated from Netty only
private int flushOffset;
private int completeCount;
+ private int lastBarrierOffset = -1;
OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
/*
OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
/*
// This has been a barrier -- make sure we complete all preceding requests
if (entry.isBarrier()) {
// This has been a barrier -- make sure we complete all preceding requests
if (entry.isBarrier()) {
- LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
- for (int i = 0; i < offset; ++i) {
- final OutboundQueueEntry e = queue[i];
- if (!e.isCompleted() && e.complete(null)) {
- completeCount++;
- }
- }
+ LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+ completeRequests(offset);
+ lastBarrierOffset = offset;
- void completeAll() {
- for (OutboundQueueEntry entry : queue) {
+ private void completeRequests(final int toOffset) {
+ for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
+ final OutboundQueueEntry entry = queue[i];
if (!entry.isCompleted() && entry.complete(null)) {
completeCount++;
}
}
}
if (!entry.isCompleted() && entry.complete(null)) {
completeCount++;
}
}
}
+ void completeAll() {
+ completeRequests(queue.length);
+ }
+
int failAll(final Throwable cause) {
int ret = 0;
int failAll(final Throwable cause) {
int ret = 0;
- for (OutboundQueueEntry entry : queue) {
+ for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
+ final OutboundQueueEntry entry = queue[i];
if (!entry.isCompleted()) {
entry.fail(cause);
ret++;
if (!entry.isCompleted()) {
entry.fail(cause);
ret++;