// Updated from Netty only
private int flushOffset;
private int completeCount;
+ private int lastBarrierOffset = -1;
OutboundQueueImpl(final OutboundQueueManager<?> manager, final long baseXid, final int maxQueue) {
/*
// This has been a barrier -- make sure we complete all preceding requests
if (entry.isBarrier()) {
- LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid, xid - 1);
- for (int i = 0; i < offset; ++i) {
- final OutboundQueueEntry e = queue[i];
- if (!e.isCompleted() && e.complete(null)) {
- completeCount++;
- }
- }
+ LOG.debug("Barrier XID {} completed, cascading completion to XIDs {} to {}", xid, baseXid + lastBarrierOffset + 1, xid - 1);
+ completeRequests(offset);
+ lastBarrierOffset = offset;
}
}
return entry;
}
- void completeAll() {
- for (OutboundQueueEntry entry : queue) {
+ private void completeRequests(final int toOffset) {
+ for (int i = lastBarrierOffset + 1; i < toOffset; ++i) {
+ final OutboundQueueEntry entry = queue[i];
if (!entry.isCompleted() && entry.complete(null)) {
completeCount++;
}
}
}
+ void completeAll() {
+ completeRequests(queue.length);
+ }
+
int failAll(final Throwable cause) {
int ret = 0;
- for (OutboundQueueEntry entry : queue) {
+ for (int i = lastBarrierOffset + 1; i < queue.length; ++i) {
+ final OutboundQueueEntry entry = queue[i];
if (!entry.isCompleted()) {
entry.fail(cause);
ret++;