Optimize time checking in flush()
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / connection / ChannelOutboundQueue.java
1 /*
2  * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.connection;
10
11 import io.netty.channel.Channel;
12 import io.netty.channel.ChannelFuture;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelInboundHandlerAdapter;
15 import io.netty.util.concurrent.EventExecutor;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.GenericFutureListener;
18
19 import java.util.Queue;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.google.common.base.Preconditions;
29
30 /**
31  * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
32  * writes to be enqueued from any thread, it then schedules a task pipeline task,
33  * which shuffles messages from the queue into the pipeline.
34  *
35  * Note this is an *Inbound* handler, as it reacts to channel writability changing,
36  * which in the Netty vocabulary is an inbound event. This has already changed in
37  * the Netty 5.0.0 API, where Handlers are unified.
38  */
39 final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
40     public interface MessageHolder<T> {
41         /**
42          * Take ownership of the encapsulated listener. Guaranteed to
43          * be called at most once.
44          *
45          * @return listener encapsulated in the holder, may be null
46          * @throws IllegalStateException if the listener is no longer
47          *         available (for example because it has already been
48          *         taken).
49          */
50         GenericFutureListener<Future<Void>> takeListener();
51
52         /**
53          * Take ownership of the encapsulated message. Guaranteed to be
54          * called at most once.
55          *
56          * @return message encapsulated in the holder, may not be null
57          * @throws IllegalStateException if the message is no longer
58          *         available (for example because it has already been
59          *         taken).
60          */
61         T takeMessage();
62     }
63
64     /**
65      * This is the default upper bound we place on the flush task running
66      * a single iteration. We relinquish control after about this amount
67      * of time.
68      */
69     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
70
71     /**
72      * We re-check the time spent flushing every this many messages. We do this because
73      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
74      * or similar to disable the feature.
75      */
76     private static final int WORKTIME_RECHECK_MSGS = 64;
77     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
78
79     // Passed to executor to request triggering of flush
80     private final Runnable flushRunnable = new Runnable() {
81         @Override
82         public void run() {
83             ChannelOutboundQueue.this.flush();
84         }
85     };
86
87     /*
88      * Instead of using an AtomicBoolean object, we use these two. It saves us
89      * from allocating an extra object.
90      */
91     private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
92             AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
93     private volatile int flushScheduled = 0;
94
95     private final Queue<MessageHolder<?>> queue;
96     private final long maxWorkTime;
97     private final Channel channel;
98
99     public ChannelOutboundQueue(final Channel channel, final int queueDepth) {
100         Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
101
102         /*
103          * This looks like a good trade-off for throughput. Alternative is
104          * to use an ArrayBlockingQueue -- but that uses a single lock to
105          * synchronize both producers and consumers, potentially leading
106          * to less throughput.
107          */
108         this.queue = new LinkedBlockingQueue<>(queueDepth);
109         this.channel = Preconditions.checkNotNull(channel);
110         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
111     }
112
113     /**
114      * Enqueue a message holder for transmission. Is a thread-safe entry point
115      * for the channel. If the cannot be placed on the queue, this
116      *
117      * @param holder MessageHolder which should be enqueue
118      * @return Success indicator, true if the enqueue operation succeeded,
119      *         false if the queue is full.
120      */
121     public boolean enqueue(final MessageHolder<?> holder) {
122         LOG.trace("Enqueuing message {}", holder);
123         if (queue.offer(holder)) {
124             LOG.trace("Message enqueued");
125             conditionalFlush();
126             return true;
127         }
128
129         LOG.trace("Message queue is full");
130         return false;
131     }
132
133     private void scheduleFlush(final EventExecutor executor) {
134         if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
135             LOG.trace("Scheduling flush task");
136             executor.execute(flushRunnable);
137         } else {
138             LOG.trace("Flush task is already present");
139         }
140     }
141
142     /**
143      * Schedule a queue flush if it is not empty and the channel is found
144      * to be writable.
145      */
146     private void conditionalFlush() {
147         if (queue.isEmpty()) {
148             LOG.trace("Queue is empty, not flush needed");
149             return;
150         }
151         if (!channel.isWritable()) {
152             LOG.trace("Channel {} is not writable, not issuing a flush", channel);
153             return;
154         }
155
156         scheduleFlush(channel.pipeline().lastContext().executor());
157     }
158
159     /*
160      * The synchronized keyword should be unnecessary, really, but it enforces
161      * queue order should something go terribly wrong. It should be completely
162      * uncontended.
163      */
164     private synchronized void flush() {
165         final long start = System.nanoTime();
166         final long deadline = start + maxWorkTime;
167
168         LOG.debug("Dequeuing messages to channel {}", channel);
169
170         long messages = 0;
171         for (;; ++messages) {
172             if (!channel.isWritable()) {
173                 LOG.trace("Channel is no longer writable");
174                 break;
175             }
176
177             final MessageHolder<?> h = queue.poll();
178             if (h == null) {
179                 LOG.trace("The queue is completely drained");
180                 break;
181             }
182
183             final ChannelFuture p = channel.write(h.takeMessage());
184             final GenericFutureListener<Future<Void>> l = h.takeListener();
185             if (l != null) {
186                 p.addListener(l);
187             }
188
189             /*
190              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
191              *
192              * XXX: given we already measure our flushing throughput, we
193              *      should be able to perform dynamic adjustments here.
194              *      is that additional complexity needed, though?
195              */
196             if ((messages % WORKTIME_RECHECK_MSGS) == 0) {
197                 if (System.nanoTime() >= deadline) {
198                     LOG.trace("Exceeded allotted work time {}us",
199                             TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
200                     break;
201                 }
202             }
203         }
204
205         if (messages > 0) {
206             LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
207             channel.flush();
208         }
209
210         final long stop = System.nanoTime();
211         LOG.debug("Flushed {} messages in {}us to channel {}",
212                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), channel);
213
214         /*
215          * We are almost ready to terminate. This is a bit tricky, because
216          * we do not want to have a race window where a message would be
217          * stuck on the queue without a flush being scheduled.
218          *
219          * So we mark ourselves as not running and then re-check if a
220          * flush out is needed. That will re-synchronized with other threads
221          * such that only one flush is scheduled at any given time.
222          */
223         if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
224             LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
225         }
226
227         conditionalFlush();
228     }
229
230     private void conditionalFlush(final ChannelHandlerContext ctx) {
231         Preconditions.checkState(ctx.channel() == channel, "Inconsistent channel %s with context %s", channel, ctx);
232         conditionalFlush();
233     }
234
235     @Override
236     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
237         super.channelActive(ctx);
238         conditionalFlush(ctx);
239     }
240
241     @Override
242     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
243         super.channelWritabilityChanged(ctx);
244         conditionalFlush(ctx);
245     }
246
247     @Override
248     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
249         super.channelInactive(ctx);
250
251         long entries = 0;
252         LOG.debug("Channel shutdown, flushing queue...");
253         final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
254         while (true) {
255             final MessageHolder<?> e = queue.poll();
256             if (e == null) {
257                 break;
258             }
259
260             e.takeListener().operationComplete(result);
261             entries++;
262         }
263
264         LOG.debug("Flushed {} queue entries", entries);
265     }
266
267     @Override
268     public String toString() {
269         return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
270     }
271 }