import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
import javax.annotation.Nonnull;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
- private static final long FLUSH_RETRY_NANOS = 1L;
private final OutboundQueueManager<?> manager;
private final OutboundQueueEntry[] queue;
private final long baseXid;
for (;;) {
// No message ready
if (isEmpty()) {
- LOG.debug("Flush offset {} is uptodate with reserved", flushOffset);
+ LOG.trace("Flushed all reserved entries up to ", flushOffset);
return null;
}
- boolean retry = true;
- while (!queue[flushOffset].isCommitted()) {
- if (!retry) {
- LOG.debug("Offset {} not ready yet, giving up", flushOffset);
- return null;
- }
-
- LOG.debug("Offset {} not ready yet, retrying", flushOffset);
- LockSupport.parkNanos(FLUSH_RETRY_NANOS);
- retry = false;
+ final OutboundQueueEntry entry = queue[flushOffset];
+ if (!entry.isCommitted()) {
+ LOG.trace("Request at offset {} not ready yet, giving up", flushOffset);
+ return null;
}
- final OfHeader msg = queue[flushOffset++].getMessage();
+ final OfHeader msg = entry.getMessage();
+ flushOffset++;
if (msg != null) {
return msg;
}
+
+ LOG.trace("Null message, skipping to offset {}", flushOffset);
}
}