X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=openflow-protocol-impl%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowjava%2Fprotocol%2Fimpl%2Fcore%2Fconnection%2FOutboundQueueManager.java;h=90db23da65d3dfa37b245edf3ce1b6af5d7fd951;hb=2429e5af03939e1f8db057247a654e361a266645;hp=b3b7b8c384b49af1a09ec2e56a214a8ba6c37d8a;hpb=cefbc1e206effcc6d7dc39d860ab2829d3c3cf2a;p=openflowjava.git diff --git a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java index b3b7b8c3..90db23da 100644 --- a/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java +++ b/openflow-protocol-impl/src/main/java/org/opendaylight/openflowjava/protocol/impl/core/connection/OutboundQueueManager.java @@ -8,65 +8,25 @@ package org.opendaylight.openflowjava.protocol.impl.core.connection; import com.google.common.base.Preconditions; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Queue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException; import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder; -import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage; import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -final class OutboundQueueManager extends ChannelInboundHandlerAdapter implements AutoCloseable { +final class OutboundQueueManager extends + AbstractOutboundQueueManager { private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class); - /** - * This is the default upper bound we place on the flush task running - * a single iteration. We relinquish control after about this amount - * of time. - */ - private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100); - - /** - * We re-check the time spent flushing every this many messages. We do this because - * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE - * or similar to disable the feature. - */ - private static final int WORKTIME_RECHECK_MSGS = 64; - - private final Queue activeQueues = new LinkedList<>(); - private final AtomicBoolean flushScheduled = new AtomicBoolean(); - private final ConnectionAdapterImpl parent; - private final InetSocketAddress address; + private final int maxNonBarrierMessages; private final long maxBarrierNanos; - private final long maxWorkTime; - private final T handler; // Updated from netty only - private long lastBarrierNanos = System.nanoTime(); - private OutboundQueueCacheSlice slice; - private OutboundQueueImpl currentQueue; private boolean barrierTimerEnabled; + private long lastBarrierNanos = System.nanoTime(); private int nonBarrierMessages; - private long lastXid = 0; - private Integer shutdownOffset; - - // Passed to executor to request triggering of flush - private final Runnable flushRunnable = new Runnable() { - @Override - public void run() { - flush(); - } - }; // Passed to executor to request a periodic barrier check private final Runnable barrierRunnable = new Runnable() { @@ -77,40 +37,17 @@ final class OutboundQueueManager extends Channel }; OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler, - final OutboundQueueCacheSlice slice, final long maxBarrierNanos) { - this.parent = Preconditions.checkNotNull(parent); - this.handler = Preconditions.checkNotNull(handler); - this.slice = Preconditions.checkNotNull(slice); + final int maxNonBarrierMessages, final long maxBarrierNanos) { + super(parent, address, handler); + Preconditions.checkArgument(maxNonBarrierMessages > 0); + this.maxNonBarrierMessages = maxNonBarrierMessages; Preconditions.checkArgument(maxBarrierNanos > 0); this.maxBarrierNanos = maxBarrierNanos; - this.address = address; - this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS); - - LOG.debug("Queue manager instantiated with queue slice {}", slice); - createQueue(); - } - - T getHandler() { - return handler; } @Override - public void close() { - handler.onConnectionQueueChanged(null); - if (slice != null) { - slice.decRef(); - slice = null; - } - } - - private void createQueue() { - final long baseXid = lastXid; - lastXid += slice.getQueueSize() + 1; - - final OutboundQueueImpl queue = slice.getQueue(this, baseXid); - activeQueues.add(queue); - currentQueue = queue; - handler.onConnectionQueueChanged(queue); + protected StackedOutboundQueue initializeStackedOutboudnqueue() { + return new StackedOutboundQueue(this); } private void scheduleBarrierTimer(final long now) { @@ -133,119 +70,10 @@ final class OutboundQueueManager extends Channel return; } - currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null); + currentQueue.commitEntry(xid, getHandler().createBarrierRequest(xid), null); LOG.trace("Barrier XID {} scheduled", xid); } - /** - * Flush an entry from the queue. - * - * @param now Time reference for 'now'. We take this as an argument, as - * we need a timestamp to mark barrier messages we see swinging - * by. That timestamp does not need to be completely accurate, - * hence we use the flush start time. Alternative would be to - * measure System.nanoTime() for each barrier -- needlessly - * adding overhead. - * - * @return Entry which was flushed, null if no entry is ready. - */ - OfHeader flushEntry(final long now) { - final OfHeader message = currentQueue.flushEntry(); - if (currentQueue.isFlushed()) { - LOG.debug("Queue {} is fully flushed", currentQueue); - createQueue(); - } - - if (message == null) { - return null; - } - - if (message instanceof BarrierInput) { - LOG.trace("Barrier message seen, resetting counters"); - nonBarrierMessages = 0; - lastBarrierNanos = now; - } else { - nonBarrierMessages++; - if (nonBarrierMessages >= slice.getQueueSize()) { - LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages); - scheduleBarrierMessage(); - } else if (!barrierTimerEnabled) { - scheduleBarrierTimer(now); - } - } - - return message; - } - - /** - * Invoked whenever a message comes in from the switch. Runs matching - * on all active queues in an attempt to complete a previous request. - * - * @param message Potential response message - * @return True if the message matched a previous request, false otherwise. - */ - boolean onMessage(final OfHeader message) { - LOG.trace("Attempting to pair message {} to a request", message); - - Iterator it = activeQueues.iterator(); - while (it.hasNext()) { - final OutboundQueueImpl queue = it.next(); - final OutboundQueueEntry entry = queue.pairRequest(message); - - if (entry == null) { - continue; - } - - LOG.trace("Queue {} accepted response {}", queue, message); - - // This has been a barrier request, we need to flush all - // previous queues - if (entry.isBarrier() && activeQueues.size() > 1) { - LOG.trace("Queue {} indicated request was a barrier", queue); - - it = activeQueues.iterator(); - while (it.hasNext()) { - final OutboundQueueImpl q = it.next(); - - // We want to complete all queues before the current one, we will - // complete the current queue below - if (!queue.equals(q)) { - LOG.trace("Queue {} is implied finished", q); - q.completeAll(); - it.remove(); - slice.putQueue(q); - } else { - break; - } - } - } - - if (queue.isFinished()) { - LOG.trace("Queue {} is finished", queue); - it.remove(); - slice.putQueue(queue); - } - - return true; - } - - LOG.debug("Failed to find completion for message {}", message); - return false; - } - - private void scheduleFlush() { - if (flushScheduled.compareAndSet(false, true)) { - LOG.trace("Scheduling flush task on channel {}", parent.getChannel()); - parent.getChannel().eventLoop().execute(flushRunnable); - } else { - LOG.trace("Flush task is already present on channel {}", parent.getChannel()); - } - } - - void ensureFlushing(final OutboundQueueImpl queue) { - Preconditions.checkState(currentQueue.equals(queue)); - scheduleFlush(); - } /** * Periodic barrier check. @@ -253,7 +81,7 @@ final class OutboundQueueManager extends Channel protected void barrier() { LOG.debug("Channel {} barrier timer expired", parent.getChannel()); barrierTimerEnabled = false; - if (shutdownOffset != null) { + if (shuttingDown) { LOG.trace("Channel shut down, not processing barrier"); return; } @@ -271,184 +99,31 @@ final class OutboundQueueManager extends Channel } } - private void rescheduleFlush() { - /* - * We are almost ready to terminate. This is a bit tricky, because - * we do not want to have a race window where a message would be - * stuck on the queue without a flush being scheduled. - * - * So we mark ourselves as not running and then re-check if a - * flush out is needed. That will re-synchronized with other threads - * such that only one flush is scheduled at any given time. - */ - if (!flushScheduled.compareAndSet(true, false)) { - LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this); - } - - conditionalFlush(); - } - - private void shutdownFlush() { - long entries = 0; - - // Fail all queues - final Iterator it = activeQueues.iterator(); - while (it.hasNext()) { - final OutboundQueueImpl queue = it.next(); - - entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED); - if (queue.isFinished()) { - LOG.trace("Cleared queue {}", queue); - it.remove(); - } - } - - LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel()); - - Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet"); - if (currentQueue.isShutdown(shutdownOffset)) { - currentQueue = null; - handler.onConnectionQueueChanged(null); - LOG.debug("Channel {} shutdown complete", parent.getChannel()); - } else { - LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel()); - rescheduleFlush(); - } - } - /** - * Perform a single flush operation. - */ - protected void flush() { - // If the channel is gone, just flush whatever is not completed - if (shutdownOffset != null) { - shutdownFlush(); - return; - } - - final long start = System.nanoTime(); - final long deadline = start + maxWorkTime; - - LOG.debug("Dequeuing messages to channel {}", parent.getChannel()); - - long messages = 0; - for (;; ++messages) { - if (!parent.getChannel().isWritable()) { - LOG.debug("Channel {} is no longer writable", parent.getChannel()); - break; - } - - final OfHeader message = flushEntry(start); - if (message == null) { - LOG.trace("The queue is completely drained"); - break; - } - - final Object wrapper; - if (address == null) { - wrapper = new MessageListenerWrapper(message, null); - } else { - wrapper = new UdpMessageListenerWrapper(message, null, address); - } - parent.getChannel().write(wrapper); - - /* - * Check every WORKTIME_RECHECK_MSGS for exceeded time. - * - * XXX: given we already measure our flushing throughput, we - * should be able to perform dynamic adjustments here. - * is that additional complexity needed, though? - */ - if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) { - LOG.trace("Exceeded allotted work time {}us", - TimeUnit.NANOSECONDS.toMicros(maxWorkTime)); - break; - } - } - - if (messages > 0) { - LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel()); - parent.getChannel().flush(); - } - - final long stop = System.nanoTime(); - LOG.debug("Flushed {} messages in {}us to channel {}", - messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel()); - - rescheduleFlush(); - } - - /** - * Schedule a queue flush if it is not empty and the channel is found - * to be writable. May only be called from Netty context. + * Write a message into the underlying channel. + * + * @param now Time reference for 'now'. We take this as an argument, as + * we need a timestamp to mark barrier messages we see swinging + * by. That timestamp does not need to be completely accurate, + * hence we use the flush start time. Alternative would be to + * measure System.nanoTime() for each barrier -- needlessly + * adding overhead. */ - private void conditionalFlush() { - if (currentQueue.needsFlush()) { - if (shutdownOffset != null || parent.getChannel().isWritable()) { - scheduleFlush(); - } else { - LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel()); - } - } else { - LOG.trace("Queue is empty, no flush needed"); - } - } - - private void conditionalFlush(final ChannelHandlerContext ctx) { - Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx); - conditionalFlush(); - } - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - super.channelActive(ctx); - conditionalFlush(ctx); - } - - @Override - public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception { - super.channelWritabilityChanged(ctx); - - if (flushScheduled.compareAndSet(false, true)) { - LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel()); - flush(); + void writeMessage(final OfHeader message, final long now) { + super.writeMessage(message, now); + if (message instanceof BarrierInput) { + LOG.trace("Barrier message seen, resetting counters"); + nonBarrierMessages = 0; + lastBarrierNanos = now; } else { - LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel()); - } - } - - @Override - public void channelInactive(final ChannelHandlerContext ctx) throws Exception { - super.channelInactive(ctx); - - LOG.debug("Channel {} initiating shutdown...", parent.getChannel()); - - /* - * We are dealing with a multi-threaded shutdown, as the user may still - * be reserving entries in the queue. We are executing in a netty thread, - * so neither flush nor barrier can be running, which is good news. - * - * We will eat up all the slots in the queue here and mark the offset first - * reserved offset and free up all the cached queues. We then schedule - * the flush task, which will deal with the rest of the shutdown process. - */ - shutdownOffset = currentQueue.startShutdown(); - if (slice != null) { - slice.decRef(); - slice = null; + nonBarrierMessages++; + if (nonBarrierMessages >= maxNonBarrierMessages) { + LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages); + scheduleBarrierMessage(); + } else if (!barrierTimerEnabled) { + scheduleBarrierTimer(now); + } } - - LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset); - scheduleFlush(); - } - - @Override - public String toString() { - return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get()); - } - - void onEchoRequest(final EchoRequestMessage message) { - final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build(); - parent.getChannel().writeAndFlush(reply); } }