2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import java.net.InetSocketAddress;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
29 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
32 * This is the default upper bound we place on the flush task running
33 * a single iteration. We relinquish control after about this amount
36 private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
39 * We re-check the time spent flushing every this many messages. We do this because
40 * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
41 * or similar to disable the feature.
43 private static final int WORKTIME_RECHECK_MSGS = 64;
46 * We maintain a cache of this many previous queues for later reuse.
48 private static final int QUEUE_CACHE_SIZE = 4;
50 private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
51 private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
52 private final ConnectionAdapterImpl parent;
53 private final InetSocketAddress address;
54 private final long maxBarrierNanos;
55 private final long maxWorkTime;
56 private final int queueSize;
57 private final T handler;
60 * Instead of using an AtomicBoolean object, we use these two. It saves us
61 * from allocating an extra object.
63 @SuppressWarnings("rawtypes")
64 private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
65 AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
66 private volatile int flushScheduled = 0;
68 // Updated from netty only
69 private long lastBarrierNanos = System.nanoTime();
70 private OutboundQueueImpl currentQueue;
71 private int nonBarrierMessages;
72 private long lastXid = 0;
74 // Passed to executor to request triggering of flush
75 private final Runnable flushRunnable = new Runnable() {
81 private final Runnable barrierRunnable = new Runnable() {
88 OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
89 final int queueSize, final long maxBarrierNanos) {
90 this.parent = Preconditions.checkNotNull(parent);
91 this.handler = Preconditions.checkNotNull(handler);
92 Preconditions.checkArgument(queueSize > 0);
93 this.queueSize = queueSize;
94 Preconditions.checkArgument(maxBarrierNanos > 0);
95 this.maxBarrierNanos = maxBarrierNanos;
96 this.address = address;
97 this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
99 LOG.debug("Queue manager instantiated with queue size {}", queueSize);
101 scheduleBarrierTimer(lastBarrierNanos);
109 public void close() {
110 handler.onConnectionQueueChanged(null);
113 private void retireQueue(final OutboundQueueImpl queue) {
114 if (queueCache.offer(queue)) {
115 LOG.debug("Saving queue {} for later reuse", queue);
117 LOG.debug("Queue {} thrown away", queue);
121 private void createQueue() {
122 final long baseXid = lastXid;
123 lastXid += queueSize + 1;
125 final OutboundQueueImpl cached = queueCache.poll();
126 final OutboundQueueImpl queue;
127 if (cached != null) {
128 queue = cached.reuse(baseXid);
129 LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
131 queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
132 LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
135 activeQueues.add(queue);
136 currentQueue = queue;
137 handler.onConnectionQueueChanged(queue);
140 private void scheduleBarrierTimer(final long now) {
141 long next = lastBarrierNanos + maxBarrierNanos;
143 LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
144 next = now + maxBarrierNanos;
147 final long delay = next - now;
148 LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
149 parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
152 private void scheduleBarrierMessage() {
153 final Long xid = currentQueue.reserveEntry(true);
154 Verify.verifyNotNull(xid);
156 currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
157 LOG.debug("Barrier XID {} scheduled", xid);
159 // We can see into the future when compared to flushEntry(), as that
160 // codepath may be lagging behind on messages. Resetting the counter
161 // here ensures that flushEntry() will not attempt to issue a flush
162 // request. Note that we do not reset current time, as that should
163 // reflect when we sent the message for real.
164 nonBarrierMessages = 0;
168 * Flush an entry from the queue.
170 * @param now Time reference for 'now'. We take this as an argument, as
171 * we need a timestamp to mark barrier messages we see swinging
172 * by. That timestamp does not need to be completely accurate,
173 * hence we use the flush start time. Alternative would be to
174 * measure System.nanoTime() for each barrier -- needlessly
177 * @return Entry which was flushed, null if no entry is ready.
179 OfHeader flushEntry(final long now) {
180 final OfHeader message = currentQueue.flushEntry();
181 if (currentQueue.isFlushed()) {
182 LOG.debug("Queue {} is fully flushed", currentQueue);
186 if (message == null) {
190 if (message instanceof BarrierInput) {
191 LOG.debug("Barrier message seen, resetting counters");
192 nonBarrierMessages = 0;
193 lastBarrierNanos = now;
195 nonBarrierMessages++;
196 if (nonBarrierMessages >= queueSize) {
197 LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
198 scheduleBarrierMessage();
206 * Invoked whenever a message comes in from the switch. Runs matching
207 * on all active queues in an attempt to complete a previous request.
209 * @param message Potential response message
210 * @return True if the message matched a previous request, false otherwise.
212 boolean onMessage(final OfHeader message) {
213 LOG.debug("Attempting to pair message {} to a request", message);
215 Iterator<OutboundQueueImpl> it = activeQueues.iterator();
216 while (it.hasNext()) {
217 final OutboundQueueImpl queue = it.next();
218 final OutboundQueueEntry entry = queue.pairRequest(message);
224 LOG.debug("Queue {} accepted response {}", queue, message);
226 // This has been a barrier request, we need to flush all
228 if (entry.isBarrier() && activeQueues.size() > 1) {
229 LOG.debug("Queue {} indicated request was a barrier", queue);
231 it = activeQueues.iterator();
232 while (it.hasNext()) {
233 final OutboundQueueImpl q = it.next();
235 // We want to complete all queues before the current one, we will
236 // complete the current queue below
237 if (!queue.equals(q)) {
238 LOG.debug("Queue {} is implied finished", q);
248 if (queue.isFinished()) {
249 LOG.debug("Queue {} is finished", queue);
257 LOG.debug("Failed to find completion for message {}", message);
261 private void scheduleFlush() {
262 if (parent.getChannel().isWritable()) {
263 if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
264 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
265 parent.getChannel().eventLoop().execute(flushRunnable);
267 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
270 LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
274 void ensureFlushing(final OutboundQueueImpl queue) {
275 Preconditions.checkState(currentQueue.equals(queue));
280 * Periodic barrier check.
282 protected void barrier() {
283 LOG.debug("Channel {} barrier timer expired", parent.getChannel());
284 if (currentQueue == null) {
285 LOG.debug("Channel shut down, not processing barrier");
289 final long now = System.nanoTime();
290 final long sinceLast = now - lastBarrierNanos;
291 if (sinceLast >= maxBarrierNanos) {
292 LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
293 // FIXME: we should be tracking requests/responses instead of this
294 if (nonBarrierMessages == 0) {
295 LOG.debug("No messages written since last barrier, not issuing one");
297 scheduleBarrierMessage();
301 scheduleBarrierTimer(now);
305 * Perform a single flush operation.
307 protected void flush() {
308 final long start = System.nanoTime();
309 final long deadline = start + maxWorkTime;
311 LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
314 for (;; ++messages) {
315 if (!parent.getChannel().isWritable()) {
316 LOG.trace("Channel is no longer writable");
320 final OfHeader message = flushEntry(start);
321 if (message == null) {
322 LOG.trace("The queue is completely drained");
326 final Object wrapper;
327 if (address == null) {
328 wrapper = new MessageListenerWrapper(message, null);
330 wrapper = new UdpMessageListenerWrapper(message, null, address);
332 parent.getChannel().write(wrapper);
335 * Check every WORKTIME_RECHECK_MSGS for exceeded time.
337 * XXX: given we already measure our flushing throughput, we
338 * should be able to perform dynamic adjustments here.
339 * is that additional complexity needed, though?
341 if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
342 LOG.trace("Exceeded allotted work time {}us",
343 TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
349 LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
350 parent.getChannel().flush();
353 final long stop = System.nanoTime();
354 LOG.debug("Flushed {} messages in {}us to channel {}",
355 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
358 * We are almost ready to terminate. This is a bit tricky, because
359 * we do not want to have a race window where a message would be
360 * stuck on the queue without a flush being scheduled.
362 * So we mark ourselves as not running and then re-check if a
363 * flush out is needed. That will re-synchronized with other threads
364 * such that only one flush is scheduled at any given time.
366 if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
367 LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
375 * Schedule a queue flush if it is not empty and the channel is found
376 * to be writable. May only be called from Netty context.
378 private void conditionalFlush() {
379 if (!currentQueue.isEmpty()) {
382 LOG.trace("Queue is empty, no flush needed");
386 private void conditionalFlush(final ChannelHandlerContext ctx) {
387 Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
392 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
393 super.channelActive(ctx);
394 conditionalFlush(ctx);
398 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
399 super.channelWritabilityChanged(ctx);
400 conditionalFlush(ctx);
404 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
405 super.channelInactive(ctx);
408 LOG.debug("Channel shutdown, flushing queue...");
409 handler.onConnectionQueueChanged(null);
411 final Throwable cause = new RejectedExecutionException("Channel disconnected");
412 for (OutboundQueueImpl queue : activeQueues) {
413 entries += queue.failAll(cause);
415 activeQueues.clear();
417 LOG.debug("Flushed {} queue entries", entries);
421 public String toString() {
422 return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);