Speedup ChannelOutboundQueue flush end
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ChannelOutboundQueue.java
1 /*
2  * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.channel.ChannelInboundHandlerAdapter;
16 import io.netty.util.concurrent.EventExecutor;
17 import io.netty.util.concurrent.Future;
18 import io.netty.util.concurrent.GenericFutureListener;
19 import java.net.InetSocketAddress;
20 import java.util.Queue;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
30  * writes to be enqueued from any thread, it then schedules a task pipeline task,
31  * which shuffles messages from the queue into the pipeline.
32  *
33  * Note this is an *Inbound* handler, as it reacts to channel writability changing,
34  * which in the Netty vocabulary is an inbound event. This has already changed in
35  * the Netty 5.0.0 API, where Handlers are unified.
36  */
37 final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
38     public interface MessageHolder<T> {
39         /**
40          * Take ownership of the encapsulated listener. Guaranteed to
41          * be called at most once.
42          *
43          * @return listener encapsulated in the holder, may be null
44          * @throws IllegalStateException if the listener is no longer
45          *         available (for example because it has already been
46          *         taken).
47          */
48         GenericFutureListener<Future<Void>> takeListener();
49
50         /**
51          * Take ownership of the encapsulated message. Guaranteed to be
52          * called at most once.
53          *
54          * @return message encapsulated in the holder, may not be null
55          * @throws IllegalStateException if the message is no longer
56          *         available (for example because it has already been
57          *         taken).
58          */
59         T takeMessage();
60     }
61
62     /**
63      * This is the default upper bound we place on the flush task running
64      * a single iteration. We relinquish control after about this amount
65      * of time.
66      */
67     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
68
69     /**
70      * We re-check the time spent flushing every this many messages. We do this because
71      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
72      * or similar to disable the feature.
73      */
74     private static final int WORKTIME_RECHECK_MSGS = 64;
75     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
76
77     // Passed to executor to request triggering of flush
78     private final Runnable flushRunnable = new Runnable() {
79         @Override
80         public void run() {
81             ChannelOutboundQueue.this.flush();
82         }
83     };
84
85     /*
86      * Instead of using an AtomicBoolean object, we use these two. It saves us
87      * from allocating an extra object.
88      */
89     private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
90             AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
91     private volatile int flushScheduled = 0;
92
93     private final Queue<MessageHolder<?>> queue;
94     private final long maxWorkTime;
95     private final Channel channel;
96     private final InetSocketAddress address;
97
98     public ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
99         Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
100
101         /*
102          * This looks like a good trade-off for throughput. Alternative is
103          * to use an ArrayBlockingQueue -- but that uses a single lock to
104          * synchronize both producers and consumers, potentially leading
105          * to less throughput.
106          */
107         this.queue = new LinkedBlockingQueue<>(queueDepth);
108         this.channel = Preconditions.checkNotNull(channel);
109         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
110         this.address = address;
111     }
112
113     /**
114      * Enqueue a message holder for transmission. Is a thread-safe entry point
115      * for the channel. If the cannot be placed on the queue, this
116      *
117      * @param holder MessageHolder which should be enqueue
118      * @return Success indicator, true if the enqueue operation succeeded,
119      *         false if the queue is full.
120      */
121     public boolean enqueue(final MessageHolder<?> holder) {
122         LOG.trace("Enqueuing message {}", holder);
123         if (queue.offer(holder)) {
124             LOG.trace("Message enqueued");
125             conditionalFlush();
126             return true;
127         }
128
129         LOG.debug("Message queue is full");
130         return false;
131     }
132
133     private void scheduleFlush(final EventExecutor executor) {
134         if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
135             LOG.trace("Scheduling flush task");
136             executor.execute(flushRunnable);
137         } else {
138             LOG.trace("Flush task is already present");
139         }
140     }
141
142     /**
143      * Schedule a queue flush if it is not empty and the channel is found
144      * to be writable.
145      */
146     private void conditionalFlush() {
147         if (queue.isEmpty()) {
148             LOG.trace("Queue is empty, not flush needed");
149             return;
150         }
151         if (!channel.isWritable()) {
152             LOG.trace("Channel {} is not writable, not issuing a flush", channel);
153             return;
154         }
155
156         scheduleFlush(channel.pipeline().lastContext().executor());
157     }
158
159     /*
160      * The synchronized keyword should be unnecessary, really, but it enforces
161      * queue order should something go terribly wrong. It should be completely
162      * uncontended.
163      */
164     private synchronized void flush() {
165
166         final long start = System.nanoTime();
167         final long deadline = start + maxWorkTime;
168
169         LOG.debug("Dequeuing messages to channel {}", channel);
170
171         long messages = 0;
172         for (;; ++messages) {
173             if (!channel.isWritable()) {
174                 LOG.trace("Channel is no longer writable");
175                 break;
176             }
177
178             final MessageHolder<?> h = queue.poll();
179             if (h == null) {
180                 LOG.trace("The queue is completely drained");
181                 break;
182             }
183
184             final GenericFutureListener<Future<Void>> l = h.takeListener();
185
186             final ChannelFuture p;
187             if (address == null) {
188                 p = channel.write(new MessageListenerWrapper(h.takeMessage(), l));
189             } else {
190                 p = channel.write(new UdpMessageListenerWrapper(h.takeMessage(), l, address));
191             }
192             if (l != null) {
193                 p.addListener(l);
194             }
195
196             /*
197              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
198              *
199              * XXX: given we already measure our flushing throughput, we
200              *      should be able to perform dynamic adjustments here.
201              *      is that additional complexity needed, though?
202              */
203             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
204                 LOG.trace("Exceeded allotted work time {}us",
205                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
206                 break;
207             }
208         }
209
210         if (messages > 0) {
211             LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
212             channel.flush();
213         }
214
215         if (LOG.isDebugEnabled()) {
216             final long stop = System.nanoTime();
217             LOG.debug("Flushed {} messages in {}us to channel {}",
218                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), channel);
219         }
220
221         /*
222          * We are almost ready to terminate. This is a bit tricky, because
223          * we do not want to have a race window where a message would be
224          * stuck on the queue without a flush being scheduled.
225          *
226          * So we mark ourselves as not running and then re-check if a
227          * flush out is needed. That will re-synchronized with other threads
228          * such that only one flush is scheduled at any given time.
229          */
230         if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
231             LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
232         }
233
234         conditionalFlush();
235     }
236
237     private void conditionalFlush(final ChannelHandlerContext ctx) {
238         Preconditions.checkState(ctx.channel().equals(channel), "Inconsistent channel %s with context %s", channel, ctx);
239         conditionalFlush();
240     }
241
242     @Override
243     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
244         super.channelActive(ctx);
245         conditionalFlush(ctx);
246     }
247
248     @Override
249     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
250         super.channelWritabilityChanged(ctx);
251         conditionalFlush(ctx);
252     }
253
254     @Override
255     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
256         super.channelInactive(ctx);
257
258         long entries = 0;
259         LOG.debug("Channel shutdown, flushing queue...");
260         final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
261         while (true) {
262             final MessageHolder<?> e = queue.poll();
263             if (e == null) {
264                 break;
265             }
266
267             e.takeListener().operationComplete(result);
268             entries++;
269         }
270
271         LOG.debug("Flushed {} queue entries", entries);
272     }
273
274     @Override
275     public String toString() {
276         return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
277     }
278 }