In order to relieve pressure on memory, do not wait until the entire
buffer is acknowledged, but clear the message as soon as we send it to
the network.
Change-Id: Ie64c1e0a011e1d1a1a0a98a21d9cc90642f81e6c
Signed-off-by: Robert Varga <rovarga@cisco.com>
(cherry picked from commit
be5f0cac1babcfccc7209657f2541dd0818c5cb0)
private FutureCallback<OfHeader> callback;
private OfHeader message;
private boolean completed;
private FutureCallback<OfHeader> callback;
private OfHeader message;
private boolean completed;
+ private boolean barrier;
private volatile boolean committed;
void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
this.message = message;
this.callback = callback;
private volatile boolean committed;
void commit(final OfHeader message, final FutureCallback<OfHeader> callback) {
this.message = message;
this.callback = callback;
+ this.barrier = message instanceof BarrierInput;
// Volatile write, needs to be last
committed = true;
}
void reset() {
// Volatile write, needs to be last
committed = true;
}
void reset() {
// Volatile write, needs to be last
committed = false;
}
boolean isBarrier() {
// Volatile write, needs to be last
committed = false;
}
boolean isBarrier() {
- return message instanceof BarrierInput;
}
boolean isCommitted() {
}
boolean isCommitted() {
- OfHeader getMessage() {
- return message;
+ OfHeader takeMessage() {
+ final OfHeader ret = message;
+ message = null;
+ return ret;
}
boolean complete(final OfHeader response) {
}
boolean complete(final OfHeader response) {
- Preconditions.checkState(!completed, "Attempted to complete a completed message %s with response %s", message, response);
+ Preconditions.checkState(!completed, "Attempted to complete a completed message with response %s", response);
// Multipart requests are special, we have to look at them to see
// if there is something outstanding and adjust ourselves accordingly
// Multipart requests are special, we have to look at them to see
// if there is something outstanding and adjust ourselves accordingly
callback.onFailure(cause);
}
} else {
callback.onFailure(cause);
}
} else {
- LOG.warn("Ignoring failure {} for completed message {}", cause, message);
+ LOG.warn("Ignoring failure {} for completed message", cause);
- final OfHeader msg = entry.getMessage();
+ final OfHeader msg = entry.takeMessage();
flushOffset++;
if (msg != null) {
return msg;
flushOffset++;
if (msg != null) {
return msg;