package org.opendaylight.openflowjava.protocol.impl.core.connection;
import com.google.common.base.Preconditions;
-import com.google.common.base.Verify;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
+ private final AtomicBoolean flushScheduled = new AtomicBoolean();
private final ConnectionAdapterImpl parent;
private final InetSocketAddress address;
private final long maxBarrierNanos;
private final int queueSize;
private final T handler;
- /*
- * Instead of using an AtomicBoolean object, we use these two. It saves us
- * from allocating an extra object.
- */
- @SuppressWarnings("rawtypes")
- private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
- private volatile int flushScheduled = 0;
-
// Updated from netty only
private long lastBarrierNanos = System.nanoTime();
private OutboundQueueImpl currentQueue;
+ private boolean barrierTimerEnabled;
private int nonBarrierMessages;
private long lastXid = 0;
LOG.debug("Queue manager instantiated with queue size {}", queueSize);
createQueue();
- scheduleBarrierTimer(lastBarrierNanos);
}
T getHandler() {
final long delay = next - now;
LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
+ barrierTimerEnabled = true;
}
private void scheduleBarrierMessage() {
- final Long xid = currentQueue.reserveEntry(true);
- Verify.verifyNotNull(xid);
+ final Long xid = currentQueue.reserveBarrierIfNeeded();
+ if (xid == null) {
+ LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+ return;
+ }
currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
LOG.debug("Barrier XID {} scheduled", xid);
-
- // We can see into the future when compared to flushEntry(), as that
- // codepath may be lagging behind on messages. Resetting the counter
- // here ensures that flushEntry() will not attempt to issue a flush
- // request. Note that we do not reset current time, as that should
- // reflect when we sent the message for real.
- nonBarrierMessages = 0;
}
/**
if (nonBarrierMessages >= queueSize) {
LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
scheduleBarrierMessage();
+ } else if (!barrierTimerEnabled) {
+ scheduleBarrierTimer(now);
}
}
private void scheduleFlush() {
if (parent.getChannel().isWritable()) {
- if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
+ if (flushScheduled.compareAndSet(false, true)) {
LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
parent.getChannel().eventLoop().execute(flushRunnable);
} else {
*/
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
+ barrierTimerEnabled = false;
if (currentQueue == null) {
LOG.debug("Channel shut down, not processing barrier");
return;
scheduleBarrierMessage();
}
}
-
- scheduleBarrierTimer(now);
}
/**
* flush out is needed. That will re-synchronized with other threads
* such that only one flush is scheduled at any given time.
*/
- if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
+ if (!flushScheduled.compareAndSet(true, false)) {
LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
}
@Override
public String toString() {
- return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
+ return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
}
}