- /**
- * Flush an entry from the queue.
- *
- * @param now Time reference for 'now'. We take this as an argument, as
- * we need a timestamp to mark barrier messages we see swinging
- * by. That timestamp does not need to be completely accurate,
- * hence we use the flush start time. Alternative would be to
- * measure System.nanoTime() for each barrier -- needlessly
- * adding overhead.
- *
- * @return Entry which was flushed, null if no entry is ready.
- */
- OfHeader flushEntry(final long now) {
- final OfHeader message = currentQueue.flushEntry();
- if (currentQueue.isFlushed()) {
- LOG.debug("Queue {} is fully flushed", currentQueue);
- createQueue();
- }
-
- if (message == null) {
- return null;
- }
-
- if (message instanceof BarrierInput) {
- LOG.trace("Barrier message seen, resetting counters");
- nonBarrierMessages = 0;
- lastBarrierNanos = now;
- } else {
- nonBarrierMessages++;
- if (nonBarrierMessages >= queueSize) {
- LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
- scheduleBarrierMessage();
- } else if (!barrierTimerEnabled) {
- scheduleBarrierTimer(now);
- }
- }
-
- return message;
- }
-
- /**
- * Invoked whenever a message comes in from the switch. Runs matching
- * on all active queues in an attempt to complete a previous request.
- *
- * @param message Potential response message
- * @return True if the message matched a previous request, false otherwise.
- */
- boolean onMessage(final OfHeader message) {
- LOG.trace("Attempting to pair message {} to a request", message);
-
- Iterator<OutboundQueueImpl> it = activeQueues.iterator();
- while (it.hasNext()) {
- final OutboundQueueImpl queue = it.next();
- final OutboundQueueEntry entry = queue.pairRequest(message);
-
- if (entry == null) {
- continue;
- }
-
- LOG.trace("Queue {} accepted response {}", queue, message);
-
- // This has been a barrier request, we need to flush all
- // previous queues
- if (entry.isBarrier() && activeQueues.size() > 1) {
- LOG.trace("Queue {} indicated request was a barrier", queue);
-
- it = activeQueues.iterator();
- while (it.hasNext()) {
- final OutboundQueueImpl q = it.next();
-
- // We want to complete all queues before the current one, we will
- // complete the current queue below
- if (!queue.equals(q)) {
- LOG.trace("Queue {} is implied finished", q);
- q.completeAll();
- it.remove();
- retireQueue(q);
- } else {
- break;
- }
- }
- }
-
- if (queue.isFinished()) {
- LOG.trace("Queue {} is finished", queue);
- it.remove();
- retireQueue(queue);
- }
-
- return true;
- }
-
- LOG.debug("Failed to find completion for message {}", message);
- return false;
- }
-
- private void scheduleFlush() {
- if (parent.getChannel().isWritable()) {
- if (flushScheduled.compareAndSet(false, true)) {
- LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
- parent.getChannel().eventLoop().execute(flushRunnable);
- } else {
- LOG.trace("Flush task is already present on channel {}", parent.getChannel());
- }
- } else {
- LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
- }
- }
-
- void ensureFlushing(final OutboundQueueImpl queue) {
- Preconditions.checkState(currentQueue.equals(queue));
- scheduleFlush();
- }