2 * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelHandlerContext;
17 import io.netty.channel.ChannelInboundHandlerAdapter;
18 import io.netty.util.concurrent.EventExecutor;
19 import io.netty.util.concurrent.Future;
20 import io.netty.util.concurrent.GenericFutureListener;
21 import java.net.InetSocketAddress;
22 import java.util.Queue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
32 * writes to be enqueued from any thread, it then schedules a task pipeline task,
33 * which shuffles messages from the queue into the pipeline.
36 * Note this is an *Inbound* handler, as it reacts to channel writability changing,
37 * which in the Netty vocabulary is an inbound event. This has already changed in
38 * the Netty 5.0.0 API, where Handlers are unified.
40 final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
41 public interface MessageHolder<T> {
43 * Take ownership of the encapsulated listener. Guaranteed to
44 * be called at most once.
46 * @return listener encapsulated in the holder, may be null
47 * @throws IllegalStateException if the listener is no longer
48 * available (for example because it has already been
51 GenericFutureListener<Future<Void>> takeListener();
54 * Take ownership of the encapsulated message. Guaranteed to be
55 * called at most once.
57 * @return message encapsulated in the holder, may not be null
58 * @throws IllegalStateException if the message is no longer
59 * available (for example because it has already been
66 * This is the default upper bound we place on the flush task running
67 * a single iteration. We relinquish control after about this amount
70 private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
73 * We re-check the time spent flushing every this many messages. We do this because
74 * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
75 * or similar to disable the feature.
77 private static final int WORKTIME_RECHECK_MSGS = 64;
78 private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
80 // Passed to executor to request triggering of flush
81 private final Runnable flushRunnable = ChannelOutboundQueue.this::flush;
84 * Instead of using an AtomicBoolean object, we use these two. It saves us
85 * from allocating an extra object.
87 private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
88 AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
89 private volatile int flushScheduled = 0;
91 private final Queue<MessageHolder<?>> queue;
92 private final long maxWorkTime;
93 private final Channel channel;
94 private final InetSocketAddress address;
96 ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
97 checkArgument(queueDepth > 0, "Queue depth has to be positive");
100 * This looks like a good trade-off for throughput. Alternative is
101 * to use an ArrayBlockingQueue -- but that uses a single lock to
102 * synchronize both producers and consumers, potentially leading
103 * to less throughput.
105 this.queue = new LinkedBlockingQueue<>(queueDepth);
106 this.channel = requireNonNull(channel);
107 this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
108 this.address = address;
112 * Enqueue a message holder for transmission. Is a thread-safe entry point
113 * for the channel. If the cannot be placed on the queue, this
115 * @param holder MessageHolder which should be enqueue
116 * @return Success indicator, true if the enqueue operation succeeded,
117 * false if the queue is full.
119 public boolean enqueue(final MessageHolder<?> holder) {
120 LOG.trace("Enqueuing message {}", holder);
121 if (queue.offer(holder)) {
122 LOG.trace("Message enqueued");
127 LOG.debug("Message queue is full");
131 private void scheduleFlush(final EventExecutor executor) {
132 if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
133 LOG.trace("Scheduling flush task");
134 executor.execute(flushRunnable);
136 LOG.trace("Flush task is already present");
141 * Schedule a queue flush if it is not empty and the channel is found
144 private void conditionalFlush() {
145 if (queue.isEmpty()) {
146 LOG.trace("Queue is empty, flush not needed");
149 if (!channel.isWritable()) {
150 LOG.trace("Channel {} is not writable, not issuing a flush", channel);
154 scheduleFlush(channel.pipeline().lastContext().executor());
157 private void conditionalFlush(final ChannelHandlerContext ctx) {
158 checkState(ctx.channel().equals(channel), "Inconsistent channel %s with context %s", channel, ctx);
163 * The synchronized keyword should be unnecessary, really, but it enforces
164 * queue order should something go terribly wrong. It should be completely
167 private synchronized void flush() {
169 final long start = System.nanoTime();
170 final long deadline = start + maxWorkTime;
172 LOG.debug("Dequeuing messages to channel {}", channel);
175 for (;; ++messages) {
176 if (!channel.isWritable()) {
177 LOG.trace("Channel is no longer writable");
181 final MessageHolder<?> h = queue.poll();
183 LOG.trace("The queue is completely drained");
187 final GenericFutureListener<Future<Void>> l = h.takeListener();
189 final ChannelFuture p;
190 if (address == null) {
191 p = channel.write(new MessageListenerWrapper(h.takeMessage(), l));
193 p = channel.write(new UdpMessageListenerWrapper(h.takeMessage(), l, address));
200 * Check every WORKTIME_RECHECK_MSGS for exceeded time.
202 * XXX: given we already measure our flushing throughput, we
203 * should be able to perform dynamic adjustments here.
204 * is that additional complexity needed, though?
206 if (messages % WORKTIME_RECHECK_MSGS == 0 && System.nanoTime() >= deadline) {
207 LOG.trace("Exceeded allotted work time {}us",
208 TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
214 LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
218 if (LOG.isDebugEnabled()) {
219 final long stop = System.nanoTime();
220 LOG.debug("Flushed {} messages in {}us to channel {}",
221 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), channel);
225 * We are almost ready to terminate. This is a bit tricky, because
226 * we do not want to have a race window where a message would be
227 * stuck on the queue without a flush being scheduled.
229 * So we mark ourselves as not running and then re-check if a
230 * flush out is needed. That will re-synchronized with other threads
231 * such that only one flush is scheduled at any given time.
233 if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
234 LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
241 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
242 super.channelActive(ctx);
243 conditionalFlush(ctx);
247 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
248 super.channelWritabilityChanged(ctx);
249 conditionalFlush(ctx);
253 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
254 super.channelInactive(ctx);
257 LOG.debug("Channel shutdown, flushing queue...");
258 final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
260 final MessageHolder<?> e = queue.poll();
265 e.takeListener().operationComplete(result);
269 LOG.debug("Flushed {} queue entries", entries);
273 public String toString() {
274 return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);