Merge "Use String(byte[], Charset)"
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ChannelOutboundQueue.java
1 /*
2  * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static com.google.common.base.Preconditions.checkState;
12 import static java.util.Objects.requireNonNull;
13
14 import io.netty.channel.Channel;
15 import io.netty.channel.ChannelFuture;
16 import io.netty.channel.ChannelHandlerContext;
17 import io.netty.channel.ChannelInboundHandlerAdapter;
18 import io.netty.util.concurrent.EventExecutor;
19 import io.netty.util.concurrent.Future;
20 import io.netty.util.concurrent.GenericFutureListener;
21 import java.net.InetSocketAddress;
22 import java.util.Queue;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
32  * writes to be enqueued from any thread, it then schedules a task pipeline task,
33  * which shuffles messages from the queue into the pipeline.
34  *
35  * <p>
36  * Note this is an *Inbound* handler, as it reacts to channel writability changing,
37  * which in the Netty vocabulary is an inbound event. This has already changed in
38  * the Netty 5.0.0 API, where Handlers are unified.
39  */
40 final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
41     public interface MessageHolder<T> {
42         /**
43          * Take ownership of the encapsulated listener. Guaranteed to
44          * be called at most once.
45          *
46          * @return listener encapsulated in the holder, may be null
47          * @throws IllegalStateException if the listener is no longer
48          *         available (for example because it has already been
49          *         taken).
50          */
51         GenericFutureListener<Future<Void>> takeListener();
52
53         /**
54          * Take ownership of the encapsulated message. Guaranteed to be
55          * called at most once.
56          *
57          * @return message encapsulated in the holder, may not be null
58          * @throws IllegalStateException if the message is no longer
59          *         available (for example because it has already been
60          *         taken).
61          */
62         T takeMessage();
63     }
64
65     /**
66      * This is the default upper bound we place on the flush task running
67      * a single iteration. We relinquish control after about this amount
68      * of time.
69      */
70     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
71
72     /**
73      * We re-check the time spent flushing every this many messages. We do this because
74      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
75      * or similar to disable the feature.
76      */
77     private static final int WORKTIME_RECHECK_MSGS = 64;
78     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
79
80     // Passed to executor to request triggering of flush
81     private final Runnable flushRunnable = ChannelOutboundQueue.this::flush;
82
83     /*
84      * Instead of using an AtomicBoolean object, we use these two. It saves us
85      * from allocating an extra object.
86      */
87     private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
88             AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
89     private volatile int flushScheduled = 0;
90
91     private final Queue<MessageHolder<?>> queue;
92     private final long maxWorkTime;
93     private final Channel channel;
94     private final InetSocketAddress address;
95
96     ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
97         checkArgument(queueDepth > 0, "Queue depth has to be positive");
98
99         /*
100          * This looks like a good trade-off for throughput. Alternative is
101          * to use an ArrayBlockingQueue -- but that uses a single lock to
102          * synchronize both producers and consumers, potentially leading
103          * to less throughput.
104          */
105         this.queue = new LinkedBlockingQueue<>(queueDepth);
106         this.channel = requireNonNull(channel);
107         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
108         this.address = address;
109     }
110
111     /**
112      * Enqueue a message holder for transmission. Is a thread-safe entry point
113      * for the channel. If the cannot be placed on the queue, this
114      *
115      * @param holder MessageHolder which should be enqueue
116      * @return Success indicator, true if the enqueue operation succeeded,
117      *         false if the queue is full.
118      */
119     public boolean enqueue(final MessageHolder<?> holder) {
120         LOG.trace("Enqueuing message {}", holder);
121         if (queue.offer(holder)) {
122             LOG.trace("Message enqueued");
123             conditionalFlush();
124             return true;
125         }
126
127         LOG.debug("Message queue is full");
128         return false;
129     }
130
131     private void scheduleFlush(final EventExecutor executor) {
132         if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
133             LOG.trace("Scheduling flush task");
134             executor.execute(flushRunnable);
135         } else {
136             LOG.trace("Flush task is already present");
137         }
138     }
139
140     /**
141      * Schedule a queue flush if it is not empty and the channel is found
142      * to be writable.
143      */
144     private void conditionalFlush() {
145         if (queue.isEmpty()) {
146             LOG.trace("Queue is empty, flush not needed");
147             return;
148         }
149         if (!channel.isWritable()) {
150             LOG.trace("Channel {} is not writable, not issuing a flush", channel);
151             return;
152         }
153
154         scheduleFlush(channel.pipeline().lastContext().executor());
155     }
156
157     private void conditionalFlush(final ChannelHandlerContext ctx) {
158         checkState(ctx.channel().equals(channel), "Inconsistent channel %s with context %s", channel, ctx);
159         conditionalFlush();
160     }
161
162     /*
163      * The synchronized keyword should be unnecessary, really, but it enforces
164      * queue order should something go terribly wrong. It should be completely
165      * uncontended.
166      */
167     private synchronized void flush() {
168
169         final long start = System.nanoTime();
170         final long deadline = start + maxWorkTime;
171
172         LOG.debug("Dequeuing messages to channel {}", channel);
173
174         long messages = 0;
175         for (;; ++messages) {
176             if (!channel.isWritable()) {
177                 LOG.trace("Channel is no longer writable");
178                 break;
179             }
180
181             final MessageHolder<?> h = queue.poll();
182             if (h == null) {
183                 LOG.trace("The queue is completely drained");
184                 break;
185             }
186
187             final GenericFutureListener<Future<Void>> l = h.takeListener();
188
189             final ChannelFuture p;
190             if (address == null) {
191                 p = channel.write(new MessageListenerWrapper(h.takeMessage(), l));
192             } else {
193                 p = channel.write(new UdpMessageListenerWrapper(h.takeMessage(), l, address));
194             }
195             if (l != null) {
196                 p.addListener(l);
197             }
198
199             /*
200              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
201              *
202              * XXX: given we already measure our flushing throughput, we
203              *      should be able to perform dynamic adjustments here.
204              *      is that additional complexity needed, though?
205              */
206             if (messages % WORKTIME_RECHECK_MSGS == 0 && System.nanoTime() >= deadline) {
207                 LOG.trace("Exceeded allotted work time {}us",
208                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
209                 break;
210             }
211         }
212
213         if (messages > 0) {
214             LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
215             channel.flush();
216         }
217
218         if (LOG.isDebugEnabled()) {
219             final long stop = System.nanoTime();
220             LOG.debug("Flushed {} messages in {}us to channel {}",
221                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), channel);
222         }
223
224         /*
225          * We are almost ready to terminate. This is a bit tricky, because
226          * we do not want to have a race window where a message would be
227          * stuck on the queue without a flush being scheduled.
228          *
229          * So we mark ourselves as not running and then re-check if a
230          * flush out is needed. That will re-synchronized with other threads
231          * such that only one flush is scheduled at any given time.
232          */
233         if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
234             LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
235         }
236
237         conditionalFlush();
238     }
239
240     @Override
241     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
242         super.channelActive(ctx);
243         conditionalFlush(ctx);
244     }
245
246     @Override
247     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
248         super.channelWritabilityChanged(ctx);
249         conditionalFlush(ctx);
250     }
251
252     @Override
253     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
254         super.channelInactive(ctx);
255
256         long entries = 0;
257         LOG.debug("Channel shutdown, flushing queue...");
258         final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
259         while (true) {
260             final MessageHolder<?> e = queue.poll();
261             if (e == null) {
262                 break;
263             }
264
265             e.takeListener().operationComplete(result);
266             entries++;
267         }
268
269         LOG.debug("Flushed {} queue entries", entries);
270     }
271
272     @Override
273     public String toString() {
274         return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
275     }
276 }