import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private static final int WORKTIME_RECHECK_MSGS = 64;
/**
- * We maintain a cache of this many previous queues for later reuse.
+ * Default low write watermark. Channel will become writable when number of outstanding
+ * bytes dips below this value.
*/
- private static final int QUEUE_CACHE_SIZE = 4;
+ private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
+
+ /**
+ * Default write high watermark. Channel will become un-writable when number of
+ * outstanding bytes hits this value.
+ */
+ private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
+
- private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
private final AtomicBoolean flushScheduled = new AtomicBoolean();
private final ConnectionAdapterImpl parent;
private final InetSocketAddress address;
private final long maxBarrierNanos;
private final long maxWorkTime;
- private final int queueSize;
private final T handler;
// Updated from netty only
private long lastBarrierNanos = System.nanoTime();
+ private OutboundQueueCacheSlice slice;
private OutboundQueueImpl currentQueue;
private boolean barrierTimerEnabled;
private int nonBarrierMessages;
private long lastXid = 0;
+ private Integer shutdownOffset;
// Passed to executor to request triggering of flush
private final Runnable flushRunnable = new Runnable() {
flush();
}
};
+
+ // Passed to executor to request a periodic barrier check
private final Runnable barrierRunnable = new Runnable() {
@Override
public void run() {
};
OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
- final int queueSize, final long maxBarrierNanos) {
+ final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
this.parent = Preconditions.checkNotNull(parent);
this.handler = Preconditions.checkNotNull(handler);
- Preconditions.checkArgument(queueSize > 0);
- this.queueSize = queueSize;
+ this.slice = Preconditions.checkNotNull(slice);
Preconditions.checkArgument(maxBarrierNanos > 0);
this.maxBarrierNanos = maxBarrierNanos;
this.address = address;
this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
- LOG.debug("Queue manager instantiated with queue size {}", queueSize);
+ LOG.debug("Queue manager instantiated with queue slice {}", slice);
createQueue();
}
@Override
public void close() {
handler.onConnectionQueueChanged(null);
- }
-
- private void retireQueue(final OutboundQueueImpl queue) {
- if (queueCache.offer(queue)) {
- LOG.debug("Saving queue {} for later reuse", queue);
- } else {
- LOG.debug("Queue {} thrown away", queue);
+ if (slice != null) {
+ slice.decRef();
+ slice = null;
}
}
private void createQueue() {
final long baseXid = lastXid;
- lastXid += queueSize + 1;
-
- final OutboundQueueImpl cached = queueCache.poll();
- final OutboundQueueImpl queue;
- if (cached != null) {
- queue = cached.reuse(baseXid);
- LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
- } else {
- queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
- LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
- }
+ lastXid += slice.getQueueSize() + 1;
+ final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
activeQueues.add(queue);
currentQueue = queue;
handler.onConnectionQueueChanged(queue);
private void scheduleBarrierTimer(final long now) {
long next = lastBarrierNanos + maxBarrierNanos;
if (next < now) {
- LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
+ LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
next = now + maxBarrierNanos;
}
final long delay = next - now;
- LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
+ LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
barrierTimerEnabled = true;
}
private void scheduleBarrierMessage() {
final Long xid = currentQueue.reserveBarrierIfNeeded();
if (xid == null) {
- LOG.debug("Queue {} already contains a barrier, not scheduling one", currentQueue);
+ LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
return;
}
currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
- LOG.debug("Barrier XID {} scheduled", xid);
+ LOG.trace("Barrier XID {} scheduled", xid);
}
/**
}
if (message instanceof BarrierInput) {
- LOG.debug("Barrier message seen, resetting counters");
+ LOG.trace("Barrier message seen, resetting counters");
nonBarrierMessages = 0;
lastBarrierNanos = now;
} else {
nonBarrierMessages++;
- if (nonBarrierMessages >= queueSize) {
- LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
+ if (nonBarrierMessages >= slice.getQueueSize()) {
+ LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
scheduleBarrierMessage();
} else if (!barrierTimerEnabled) {
scheduleBarrierTimer(now);
* @return True if the message matched a previous request, false otherwise.
*/
boolean onMessage(final OfHeader message) {
- LOG.debug("Attempting to pair message {} to a request", message);
+ LOG.trace("Attempting to pair message {} to a request", message);
Iterator<OutboundQueueImpl> it = activeQueues.iterator();
while (it.hasNext()) {
continue;
}
- LOG.debug("Queue {} accepted response {}", queue, message);
+ LOG.trace("Queue {} accepted response {}", queue, message);
// This has been a barrier request, we need to flush all
// previous queues
if (entry.isBarrier() && activeQueues.size() > 1) {
- LOG.debug("Queue {} indicated request was a barrier", queue);
+ LOG.trace("Queue {} indicated request was a barrier", queue);
it = activeQueues.iterator();
while (it.hasNext()) {
// We want to complete all queues before the current one, we will
// complete the current queue below
if (!queue.equals(q)) {
- LOG.debug("Queue {} is implied finished", q);
+ LOG.trace("Queue {} is implied finished", q);
q.completeAll();
it.remove();
- retireQueue(q);
+ slice.putQueue(q);
} else {
break;
}
}
if (queue.isFinished()) {
- LOG.debug("Queue {} is finished", queue);
+ LOG.trace("Queue {} is finished", queue);
it.remove();
- retireQueue(queue);
+ slice.putQueue(queue);
}
return true;
}
private void scheduleFlush() {
- if (parent.getChannel().isWritable()) {
- if (flushScheduled.compareAndSet(false, true)) {
- LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
- parent.getChannel().eventLoop().execute(flushRunnable);
- } else {
- LOG.trace("Flush task is already present on channel {}", parent.getChannel());
- }
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
+ parent.getChannel().eventLoop().execute(flushRunnable);
} else {
- LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
+ LOG.trace("Flush task is already present on channel {}", parent.getChannel());
}
}
protected void barrier() {
LOG.debug("Channel {} barrier timer expired", parent.getChannel());
barrierTimerEnabled = false;
- if (currentQueue == null) {
- LOG.debug("Channel shut down, not processing barrier");
+ if (shutdownOffset != null) {
+ LOG.trace("Channel shut down, not processing barrier");
return;
}
LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
// FIXME: we should be tracking requests/responses instead of this
if (nonBarrierMessages == 0) {
- LOG.debug("No messages written since last barrier, not issuing one");
+ LOG.trace("No messages written since last barrier, not issuing one");
} else {
scheduleBarrierMessage();
}
}
}
+ private void rescheduleFlush() {
+ /*
+ * We are almost ready to terminate. This is a bit tricky, because
+ * we do not want to have a race window where a message would be
+ * stuck on the queue without a flush being scheduled.
+ *
+ * So we mark ourselves as not running and then re-check if a
+ * flush out is needed. That will re-synchronized with other threads
+ * such that only one flush is scheduled at any given time.
+ */
+ if (!flushScheduled.compareAndSet(true, false)) {
+ LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
+ }
+
+ conditionalFlush();
+ }
+
+ private void shutdownFlush() {
+ long entries = 0;
+
+ // Fail all queues
+ final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
+ while (it.hasNext()) {
+ final OutboundQueueImpl queue = it.next();
+
+ entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
+ if (queue.isFinished()) {
+ LOG.trace("Cleared queue {}", queue);
+ it.remove();
+ }
+ }
+
+ LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
+
+ Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
+ if (currentQueue.isShutdown(shutdownOffset)) {
+ currentQueue = null;
+ handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} shutdown complete", parent.getChannel());
+ } else {
+ LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
+ rescheduleFlush();
+ }
+ }
+
/**
- * Perform a single flush operation.
+ * Perform a single flush operation. We keep it here so we do not generate
+ * syntetic accessors for private fields. Otherwise it could be moved into
+ * {@link #flushRunnable}.
*/
protected void flush() {
+ // If the channel is gone, just flush whatever is not completed
+ if (shutdownOffset != null) {
+ shutdownFlush();
+ return;
+ }
+
final long start = System.nanoTime();
final long deadline = start + maxWorkTime;
long messages = 0;
for (;; ++messages) {
if (!parent.getChannel().isWritable()) {
- LOG.trace("Channel is no longer writable");
+ LOG.debug("Channel {} is no longer writable", parent.getChannel());
break;
}
LOG.debug("Flushed {} messages in {}us to channel {}",
messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
- /*
- * We are almost ready to terminate. This is a bit tricky, because
- * we do not want to have a race window where a message would be
- * stuck on the queue without a flush being scheduled.
- *
- * So we mark ourselves as not running and then re-check if a
- * flush out is needed. That will re-synchronized with other threads
- * such that only one flush is scheduled at any given time.
- */
- if (!flushScheduled.compareAndSet(true, false)) {
- LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
- }
-
- conditionalFlush();
+ rescheduleFlush();
}
-
/**
* Schedule a queue flush if it is not empty and the channel is found
* to be writable. May only be called from Netty context.
*/
private void conditionalFlush() {
- if (!currentQueue.isEmpty()) {
- scheduleFlush();
+ if (currentQueue.needsFlush()) {
+ if (shutdownOffset != null || parent.getChannel().isWritable()) {
+ scheduleFlush();
+ } else {
+ LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
+ }
} else {
LOG.trace("Queue is empty, no flush needed");
}
conditionalFlush(ctx);
}
+ public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
+ /*
+ * Tune channel write buffering. We increase the writability window
+ * to ensure we can flush an entire queue segment in one go. We definitely
+ * want to keep the difference above 64k, as that will ensure we use jam-packed
+ * TCP packets. UDP will fragment as appropriate.
+ */
+ ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
+ ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
+
+ super.handlerAdded(ctx);
+ }
+
@Override
public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
- conditionalFlush(ctx);
+
+ if (flushScheduled.compareAndSet(false, true)) {
+ LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
+ flush();
+ } else {
+ LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
+ }
}
@Override
public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
- long entries = 0;
- LOG.debug("Channel shutdown, flushing queue...");
- handler.onConnectionQueueChanged(null);
+ LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
- final Throwable cause = new RejectedExecutionException("Channel disconnected");
- for (OutboundQueueImpl queue : activeQueues) {
- entries += queue.failAll(cause);
+ /*
+ * We are dealing with a multi-threaded shutdown, as the user may still
+ * be reserving entries in the queue. We are executing in a netty thread,
+ * so neither flush nor barrier can be running, which is good news.
+ *
+ * We will eat up all the slots in the queue here and mark the offset first
+ * reserved offset and free up all the cached queues. We then schedule
+ * the flush task, which will deal with the rest of the shutdown process.
+ */
+ shutdownOffset = currentQueue.startShutdown();
+ if (slice != null) {
+ slice.decRef();
+ slice = null;
}
- activeQueues.clear();
- LOG.debug("Flushed {} queue entries", entries);
+ LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
+ scheduleFlush();
}
@Override
public String toString() {
return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
}
+
+ void onEchoRequest(final EchoRequestMessage message) {
+ final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
+ parent.getChannel().writeAndFlush(reply);
+ }
}