2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
29 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
30 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
33 * This is the default upper bound we place on the flush task running
34 * a single iteration. We relinquish control after about this amount
37 private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
40 * We re-check the time spent flushing every this many messages. We do this because
41 * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
42 * or similar to disable the feature.
44 private static final int WORKTIME_RECHECK_MSGS = 64;
47 * Default low write watermark. Channel will become writable when number of outstanding
48 * bytes dips below this value.
50 private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
53 * Default write high watermark. Channel will become un-writable when number of
54 * outstanding bytes hits this value.
56 private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
59 private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
60 private final AtomicBoolean flushScheduled = new AtomicBoolean();
61 private final ConnectionAdapterImpl parent;
62 private final InetSocketAddress address;
63 private final long maxBarrierNanos;
64 private final long maxWorkTime;
65 private final T handler;
67 // Updated from netty only
68 private long lastBarrierNanos = System.nanoTime();
69 private OutboundQueueCacheSlice slice;
70 private OutboundQueueImpl currentQueue;
71 private boolean barrierTimerEnabled;
72 private int nonBarrierMessages;
73 private long lastXid = 0;
74 private Integer shutdownOffset;
76 // Passed to executor to request triggering of flush
77 private final Runnable flushRunnable = new Runnable() {
84 // Passed to executor to request a periodic barrier check
85 private final Runnable barrierRunnable = new Runnable() {
92 OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
93 final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
94 this.parent = Preconditions.checkNotNull(parent);
95 this.handler = Preconditions.checkNotNull(handler);
96 this.slice = Preconditions.checkNotNull(slice);
97 Preconditions.checkArgument(maxBarrierNanos > 0);
98 this.maxBarrierNanos = maxBarrierNanos;
99 this.address = address;
100 this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
102 LOG.debug("Queue manager instantiated with queue slice {}", slice);
111 public void close() {
112 handler.onConnectionQueueChanged(null);
119 private void createQueue() {
120 final long baseXid = lastXid;
121 lastXid += slice.getQueueSize() + 1;
123 final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
124 activeQueues.add(queue);
125 currentQueue = queue;
126 handler.onConnectionQueueChanged(queue);
129 private void scheduleBarrierTimer(final long now) {
130 long next = lastBarrierNanos + maxBarrierNanos;
132 LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
133 next = now + maxBarrierNanos;
136 final long delay = next - now;
137 LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
138 parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
139 barrierTimerEnabled = true;
142 private void scheduleBarrierMessage() {
143 final Long xid = currentQueue.reserveBarrierIfNeeded();
145 LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
149 currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
150 LOG.trace("Barrier XID {} scheduled", xid);
154 * Flush an entry from the queue.
156 * @param now Time reference for 'now'. We take this as an argument, as
157 * we need a timestamp to mark barrier messages we see swinging
158 * by. That timestamp does not need to be completely accurate,
159 * hence we use the flush start time. Alternative would be to
160 * measure System.nanoTime() for each barrier -- needlessly
163 * @return Entry which was flushed, null if no entry is ready.
165 OfHeader flushEntry(final long now) {
166 final OfHeader message = currentQueue.flushEntry();
167 if (currentQueue.isFlushed()) {
168 LOG.debug("Queue {} is fully flushed", currentQueue);
172 if (message == null) {
176 if (message instanceof BarrierInput) {
177 LOG.trace("Barrier message seen, resetting counters");
178 nonBarrierMessages = 0;
179 lastBarrierNanos = now;
181 nonBarrierMessages++;
182 if (nonBarrierMessages >= slice.getQueueSize()) {
183 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
184 scheduleBarrierMessage();
185 } else if (!barrierTimerEnabled) {
186 scheduleBarrierTimer(now);
194 * Invoked whenever a message comes in from the switch. Runs matching
195 * on all active queues in an attempt to complete a previous request.
197 * @param message Potential response message
198 * @return True if the message matched a previous request, false otherwise.
200 boolean onMessage(final OfHeader message) {
201 LOG.trace("Attempting to pair message {} to a request", message);
203 Iterator<OutboundQueueImpl> it = activeQueues.iterator();
204 while (it.hasNext()) {
205 final OutboundQueueImpl queue = it.next();
206 final OutboundQueueEntry entry = queue.pairRequest(message);
212 LOG.trace("Queue {} accepted response {}", queue, message);
214 // This has been a barrier request, we need to flush all
216 if (entry.isBarrier() && activeQueues.size() > 1) {
217 LOG.trace("Queue {} indicated request was a barrier", queue);
219 it = activeQueues.iterator();
220 while (it.hasNext()) {
221 final OutboundQueueImpl q = it.next();
223 // We want to complete all queues before the current one, we will
224 // complete the current queue below
225 if (!queue.equals(q)) {
226 LOG.trace("Queue {} is implied finished", q);
236 if (queue.isFinished()) {
237 LOG.trace("Queue {} is finished", queue);
239 slice.putQueue(queue);
245 LOG.debug("Failed to find completion for message {}", message);
249 private void scheduleFlush() {
250 if (flushScheduled.compareAndSet(false, true)) {
251 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
252 parent.getChannel().eventLoop().execute(flushRunnable);
254 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
258 void ensureFlushing(final OutboundQueueImpl queue) {
259 Preconditions.checkState(currentQueue.equals(queue));
264 * Periodic barrier check.
266 protected void barrier() {
267 LOG.debug("Channel {} barrier timer expired", parent.getChannel());
268 barrierTimerEnabled = false;
269 if (shutdownOffset != null) {
270 LOG.trace("Channel shut down, not processing barrier");
274 final long now = System.nanoTime();
275 final long sinceLast = now - lastBarrierNanos;
276 if (sinceLast >= maxBarrierNanos) {
277 LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
278 // FIXME: we should be tracking requests/responses instead of this
279 if (nonBarrierMessages == 0) {
280 LOG.trace("No messages written since last barrier, not issuing one");
282 scheduleBarrierMessage();
287 private void rescheduleFlush() {
289 * We are almost ready to terminate. This is a bit tricky, because
290 * we do not want to have a race window where a message would be
291 * stuck on the queue without a flush being scheduled.
293 * So we mark ourselves as not running and then re-check if a
294 * flush out is needed. That will re-synchronized with other threads
295 * such that only one flush is scheduled at any given time.
297 if (!flushScheduled.compareAndSet(true, false)) {
298 LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
304 private void shutdownFlush() {
308 final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
309 while (it.hasNext()) {
310 final OutboundQueueImpl queue = it.next();
312 entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
313 if (queue.isFinished()) {
314 LOG.trace("Cleared queue {}", queue);
319 LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
321 Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
322 if (currentQueue.isShutdown(shutdownOffset)) {
324 handler.onConnectionQueueChanged(null);
325 LOG.debug("Channel {} shutdown complete", parent.getChannel());
327 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
333 * Perform a single flush operation. We keep it here so we do not generate
334 * syntetic accessors for private fields. Otherwise it could be moved into
335 * {@link #flushRunnable}.
337 protected void flush() {
338 // If the channel is gone, just flush whatever is not completed
339 if (shutdownOffset != null) {
344 final long start = System.nanoTime();
345 final long deadline = start + maxWorkTime;
347 LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
350 for (;; ++messages) {
351 if (!parent.getChannel().isWritable()) {
352 LOG.debug("Channel {} is no longer writable", parent.getChannel());
356 final OfHeader message = flushEntry(start);
357 if (message == null) {
358 LOG.trace("The queue is completely drained");
362 final Object wrapper;
363 if (address == null) {
364 wrapper = new MessageListenerWrapper(message, null);
366 wrapper = new UdpMessageListenerWrapper(message, null, address);
368 parent.getChannel().write(wrapper);
371 * Check every WORKTIME_RECHECK_MSGS for exceeded time.
373 * XXX: given we already measure our flushing throughput, we
374 * should be able to perform dynamic adjustments here.
375 * is that additional complexity needed, though?
377 if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
378 LOG.trace("Exceeded allotted work time {}us",
379 TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
385 LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
386 parent.getChannel().flush();
389 final long stop = System.nanoTime();
390 LOG.debug("Flushed {} messages in {}us to channel {}",
391 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
397 * Schedule a queue flush if it is not empty and the channel is found
398 * to be writable. May only be called from Netty context.
400 private void conditionalFlush() {
401 if (currentQueue.needsFlush()) {
402 if (shutdownOffset != null || parent.getChannel().isWritable()) {
405 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
408 LOG.trace("Queue is empty, no flush needed");
412 private void conditionalFlush(final ChannelHandlerContext ctx) {
413 Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
418 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
419 super.channelActive(ctx);
420 conditionalFlush(ctx);
423 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
425 * Tune channel write buffering. We increase the writability window
426 * to ensure we can flush an entire queue segment in one go. We definitely
427 * want to keep the difference above 64k, as that will ensure we use jam-packed
428 * TCP packets. UDP will fragment as appropriate.
430 ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
431 ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
433 super.handlerAdded(ctx);
437 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
438 super.channelWritabilityChanged(ctx);
440 if (flushScheduled.compareAndSet(false, true)) {
441 LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
444 LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
449 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
450 super.channelInactive(ctx);
452 LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
455 * We are dealing with a multi-threaded shutdown, as the user may still
456 * be reserving entries in the queue. We are executing in a netty thread,
457 * so neither flush nor barrier can be running, which is good news.
459 * We will eat up all the slots in the queue here and mark the offset first
460 * reserved offset and free up all the cached queues. We then schedule
461 * the flush task, which will deal with the rest of the shutdown process.
463 shutdownOffset = currentQueue.startShutdown();
469 LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
474 public String toString() {
475 return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
478 void onEchoRequest(final EchoRequestMessage message) {
479 final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
480 parent.getChannel().writeAndFlush(reply);