private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueImpl.class);
private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> CURRENT_OFFSET_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "reserveOffset");
+ private static final AtomicIntegerFieldUpdater<OutboundQueueImpl> BARRIER_OFFSET_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(OutboundQueueImpl.class, "barrierOffset");
private static final long FLUSH_RETRY_NANOS = 1L;
private final OutboundQueueManager<?> manager;
private final OutboundQueueEntry[] queue;
private final int reserve;
// Updated concurrently
- private volatile int reserveOffset;
+ private volatile int barrierOffset = -1;
+ private volatile int reserveOffset = 0;
// Updated from Netty only
private int flushOffset;
return new OutboundQueueImpl(manager, baseXid, queue);
}
- Long reserveEntry(final boolean forBarrier) {
+ @Override
+ public Long reserveEntry() {
+ return reserveEntry(false);
+ }
+
+ @Override
+ public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
+ final int offset = (int)(xid - baseXid);
+ if (message != null) {
+ Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+ }
+
+ final OutboundQueueEntry entry = queue[offset];
+ entry.commit(message, callback);
+ LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
+
+ if (entry.isBarrier()) {
+ int my = offset;
+ for (;;) {
+ final int prev = BARRIER_OFFSET_UPDATER.getAndSet(this, my);
+ if (prev < my) {
+ LOG.debug("Queue {} recorded pending barrier offset {}", this, my);
+ break;
+ }
+
+ // We have traveled back, recover
+ my = prev;
+ }
+ }
+
+ manager.ensureFlushing(this);
+ }
+
+ private Long reserveEntry(final boolean forBarrier) {
final int offset = CURRENT_OFFSET_UPDATER.getAndIncrement(this);
if (offset >= reserve) {
if (forBarrier) {
return xid;
}
- @Override
- public Long reserveEntry() {
- return reserveEntry(false);
- }
-
- @Override
- public void commitEntry(final Long xid, final OfHeader message, final FutureCallback<OfHeader> callback) {
- final int offset = (int)(xid - baseXid);
- if (message != null) {
- Preconditions.checkArgument(xid.equals(message.getXid()), "Message %s has wrong XID %s, expected %s", message, message.getXid(), xid);
+ Long reserveBarrierIfNeeded() {
+ final int bo = barrierOffset;
+ if (bo >= flushOffset) {
+ LOG.debug("Barrier found at offset {} (currently at {})", bo, flushOffset);
+ return null;
+ } else {
+ return reserveEntry(true);
}
-
- queue[offset].commit(message, callback);
- LOG.debug("Queue {} XID {} at offset {} (of {}) committed", this, xid, offset, reserveOffset);
-
- manager.ensureFlushing(this);
}
/**
package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+ private final AtomicBoolean flushScheduled = new AtomicBoolean();
private final ConnectionAdapterImpl parent;
private final InetSocketAddress address;
private final long maxBarrierNanos;
private final int queueSize;
private final T handler;
- /*
- * Instead of using an AtomicBoolean object, we use these two. It saves us
- * from allocating an extra object.
- */
- @SuppressWarnings("rawtypes")
- private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
- private volatile int flushScheduled = 0;
-
// Updated from netty only
private long lastBarrierNanos = System.nanoTime();
private OutboundQueueImpl currentQueue;
+ private boolean barrierTimerEnabled;
private int nonBarrierMessages;
private long lastXid = 0;
LOG.debug("Queue manager instantiated with queue size {}", queueSize);
createQueue();
- scheduleBarrierTimer(lastBarrierNanos);
}
T getHandler() {
final long delay = next - now;
LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+ barrierTimerEnabled = true;
}
private void scheduleBarrierMessage() {
- final Long xid = currentQueue.reserveEntry(true);
- Verify.verifyNotNull(xid);
+ final Long xid = currentQueue.reserveBarrierIfNeeded();
+ if (xid == null) {
+ LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+ return;
+ }
currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
LOG.debug("Barrier XID {} scheduled", xid);
-
- // We can see into the future when compared to flushEntry(), as that
- // codepath may be lagging behind on messages. Resetting the counter
- // here ensures that flushEntry() will not attempt to issue a flush
- // request. Note that we do not reset current time, as that should
- // reflect when we sent the message for real.
- nonBarrierMessages = 0;
}
/**
if (nonBarrierMessages >= queueSize) {
LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
scheduleBarrierMessage();
+ } else if (!barrierTimerEnabled) {
+ scheduleBarrierTimer(now);
}
}
private void scheduleFlush() {
if (parent.getChannel().isWritable()) {
- if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+ if (flushScheduled.compareAndSet(false, true)) {
LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
parent.getChannel().eventLoop().execute(flushRunnable);
} else {
*/
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
+ barrierTimerEnabled = false;
if (currentQueue == null) {
LOG.debug("Channel shut down, not processing barrier");
return;
scheduleBarrierMessage();
}
}
-
- scheduleBarrierTimer(now);
}
/**
* flush out is needed. That will re-synchronized with other threads
* such that only one flush is scheduled at any given time.
*/
- if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+ if (!flushScheduled.compareAndSet(true, false)) {
LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
}
@Override
public String toString() {
- return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+ return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
}
}