2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.Queue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
31 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
34 * This is the default upper bound we place on the flush task running
35 * a single iteration. We relinquish control after about this amount
38 private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
41 * We re-check the time spent flushing every this many messages. We do this because
42 * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
43 * or similar to disable the feature.
45 private static final int WORKTIME_RECHECK_MSGS = 64;
48 * We maintain a cache of this many previous queues for later reuse.
50 private static final int QUEUE_CACHE_SIZE = 4;
52 private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
53 private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
54 private final AtomicBoolean flushScheduled = new AtomicBoolean();
55 private final ConnectionAdapterImpl parent;
56 private final InetSocketAddress address;
57 private final long maxBarrierNanos;
58 private final long maxWorkTime;
59 private final int queueSize;
60 private final T handler;
62 // Updated from netty only
63 private long lastBarrierNanos = System.nanoTime();
64 private OutboundQueueImpl currentQueue;
65 private boolean barrierTimerEnabled;
66 private int nonBarrierMessages;
67 private long lastXid = 0;
68 private Integer shutdownOffset;
70 // Passed to executor to request triggering of flush
71 private final Runnable flushRunnable = new Runnable() {
78 // Passed to executor to request a periodic barrier check
79 private final Runnable barrierRunnable = new Runnable() {
86 OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
87 final int queueSize, final long maxBarrierNanos) {
88 this.parent = Preconditions.checkNotNull(parent);
89 this.handler = Preconditions.checkNotNull(handler);
90 Preconditions.checkArgument(queueSize > 0);
91 this.queueSize = queueSize;
92 Preconditions.checkArgument(maxBarrierNanos > 0);
93 this.maxBarrierNanos = maxBarrierNanos;
94 this.address = address;
95 this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
97 LOG.debug("Queue manager instantiated with queue size {}", queueSize);
106 public void close() {
107 handler.onConnectionQueueChanged(null);
110 private void retireQueue(final OutboundQueueImpl queue) {
111 if (queueCache.offer(queue)) {
112 LOG.trace("Saving queue {} for later reuse", queue);
114 LOG.trace("Queue {} thrown away", queue);
118 private void createQueue() {
119 final long baseXid = lastXid;
120 lastXid += queueSize + 1;
122 final OutboundQueueImpl cached = queueCache.poll();
123 final OutboundQueueImpl queue;
124 if (cached != null) {
125 queue = cached.reuse(baseXid);
126 LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
128 queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
129 LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
132 activeQueues.add(queue);
133 currentQueue = queue;
134 handler.onConnectionQueueChanged(queue);
137 private void scheduleBarrierTimer(final long now) {
138 long next = lastBarrierNanos + maxBarrierNanos;
140 LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
141 next = now + maxBarrierNanos;
144 final long delay = next - now;
145 LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
146 parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
147 barrierTimerEnabled = true;
150 private void scheduleBarrierMessage() {
151 final Long xid = currentQueue.reserveBarrierIfNeeded();
153 LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
157 currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
158 LOG.trace("Barrier XID {} scheduled", xid);
162 * Flush an entry from the queue.
164 * @param now Time reference for 'now'. We take this as an argument, as
165 * we need a timestamp to mark barrier messages we see swinging
166 * by. That timestamp does not need to be completely accurate,
167 * hence we use the flush start time. Alternative would be to
168 * measure System.nanoTime() for each barrier -- needlessly
171 * @return Entry which was flushed, null if no entry is ready.
173 OfHeader flushEntry(final long now) {
174 final OfHeader message = currentQueue.flushEntry();
175 if (currentQueue.isFlushed()) {
176 LOG.debug("Queue {} is fully flushed", currentQueue);
180 if (message == null) {
184 if (message instanceof BarrierInput) {
185 LOG.trace("Barrier message seen, resetting counters");
186 nonBarrierMessages = 0;
187 lastBarrierNanos = now;
189 nonBarrierMessages++;
190 if (nonBarrierMessages >= queueSize) {
191 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
192 scheduleBarrierMessage();
193 } else if (!barrierTimerEnabled) {
194 scheduleBarrierTimer(now);
202 * Invoked whenever a message comes in from the switch. Runs matching
203 * on all active queues in an attempt to complete a previous request.
205 * @param message Potential response message
206 * @return True if the message matched a previous request, false otherwise.
208 boolean onMessage(final OfHeader message) {
209 LOG.trace("Attempting to pair message {} to a request", message);
211 Iterator<OutboundQueueImpl> it = activeQueues.iterator();
212 while (it.hasNext()) {
213 final OutboundQueueImpl queue = it.next();
214 final OutboundQueueEntry entry = queue.pairRequest(message);
220 LOG.trace("Queue {} accepted response {}", queue, message);
222 // This has been a barrier request, we need to flush all
224 if (entry.isBarrier() && activeQueues.size() > 1) {
225 LOG.trace("Queue {} indicated request was a barrier", queue);
227 it = activeQueues.iterator();
228 while (it.hasNext()) {
229 final OutboundQueueImpl q = it.next();
231 // We want to complete all queues before the current one, we will
232 // complete the current queue below
233 if (!queue.equals(q)) {
234 LOG.trace("Queue {} is implied finished", q);
244 if (queue.isFinished()) {
245 LOG.trace("Queue {} is finished", queue);
253 LOG.debug("Failed to find completion for message {}", message);
257 private void scheduleFlush() {
258 if (flushScheduled.compareAndSet(false, true)) {
259 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
260 parent.getChannel().eventLoop().execute(flushRunnable);
262 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
266 void ensureFlushing(final OutboundQueueImpl queue) {
267 Preconditions.checkState(currentQueue.equals(queue));
272 * Periodic barrier check.
274 protected void barrier() {
275 LOG.debug("Channel {} barrier timer expired", parent.getChannel());
276 barrierTimerEnabled = false;
277 if (shutdownOffset != null) {
278 LOG.trace("Channel shut down, not processing barrier");
282 final long now = System.nanoTime();
283 final long sinceLast = now - lastBarrierNanos;
284 if (sinceLast >= maxBarrierNanos) {
285 LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
286 // FIXME: we should be tracking requests/responses instead of this
287 if (nonBarrierMessages == 0) {
288 LOG.trace("No messages written since last barrier, not issuing one");
290 scheduleBarrierMessage();
295 private void rescheduleFlush() {
297 * We are almost ready to terminate. This is a bit tricky, because
298 * we do not want to have a race window where a message would be
299 * stuck on the queue without a flush being scheduled.
301 * So we mark ourselves as not running and then re-check if a
302 * flush out is needed. That will re-synchronized with other threads
303 * such that only one flush is scheduled at any given time.
305 if (!flushScheduled.compareAndSet(true, false)) {
306 LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
312 private void shutdownFlush() {
316 final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
317 while (it.hasNext()) {
318 final OutboundQueueImpl queue = it.next();
320 entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
321 if (queue.isFinished()) {
322 LOG.trace("Cleared queue {}", queue);
327 LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
329 Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
330 if (currentQueue.isShutdown(shutdownOffset)) {
332 handler.onConnectionQueueChanged(null);
333 LOG.debug("Channel {} shutdown complete", parent.getChannel());
335 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
341 * Perform a single flush operation.
343 protected void flush() {
344 // If the channel is gone, just flush whatever is not completed
345 if (shutdownOffset != null) {
350 final long start = System.nanoTime();
351 final long deadline = start + maxWorkTime;
353 LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
356 for (;; ++messages) {
357 if (!parent.getChannel().isWritable()) {
358 LOG.trace("Channel is no longer writable");
362 final OfHeader message = flushEntry(start);
363 if (message == null) {
364 LOG.trace("The queue is completely drained");
368 final Object wrapper;
369 if (address == null) {
370 wrapper = new MessageListenerWrapper(message, null);
372 wrapper = new UdpMessageListenerWrapper(message, null, address);
374 parent.getChannel().write(wrapper);
377 * Check every WORKTIME_RECHECK_MSGS for exceeded time.
379 * XXX: given we already measure our flushing throughput, we
380 * should be able to perform dynamic adjustments here.
381 * is that additional complexity needed, though?
383 if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
384 LOG.trace("Exceeded allotted work time {}us",
385 TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
391 LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
392 parent.getChannel().flush();
395 final long stop = System.nanoTime();
396 LOG.debug("Flushed {} messages in {}us to channel {}",
397 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
403 * Schedule a queue flush if it is not empty and the channel is found
404 * to be writable. May only be called from Netty context.
406 private void conditionalFlush() {
407 if (currentQueue.needsFlush()) {
410 LOG.trace("Queue is empty, no flush needed");
414 private void conditionalFlush(final ChannelHandlerContext ctx) {
415 Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
420 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
421 super.channelActive(ctx);
422 conditionalFlush(ctx);
426 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
427 super.channelWritabilityChanged(ctx);
428 conditionalFlush(ctx);
432 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
433 super.channelInactive(ctx);
435 LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
438 * We are dealing with a multi-threaded shutdown, as the user may still
439 * be reserving entries in the queue. We are executing in a netty thread,
440 * so neither flush nor barrier can be running, which is good news.
442 * We will eat up all the slots in the queue here and mark the offset first
443 * reserved offset and free up all the cached queues. We then schedule
444 * the flush task, which will deal with the rest of the shutdown process.
446 shutdownOffset = currentQueue.startShutdown();
448 LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
453 public String toString() {
454 return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
457 void onEchoRequest(final EchoRequestMessage message) {
458 final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
459 parent.getChannel().writeAndFlush(reply);