Use String(byte[], Charset)
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / ChannelOutboundQueue.java
1 /*
2  * Copyright (c) 2014 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.channel.Channel;
13 import io.netty.channel.ChannelFuture;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.channel.ChannelInboundHandlerAdapter;
16 import io.netty.util.concurrent.EventExecutor;
17 import io.netty.util.concurrent.Future;
18 import io.netty.util.concurrent.GenericFutureListener;
19 import java.net.InetSocketAddress;
20 import java.util.Queue;
21 import java.util.concurrent.LinkedBlockingQueue;
22 import java.util.concurrent.RejectedExecutionException;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Channel handler which bypasses wraps on top of normal Netty pipeline, allowing
30  * writes to be enqueued from any thread, it then schedules a task pipeline task,
31  * which shuffles messages from the queue into the pipeline.
32  *
33  * <p>
34  * Note this is an *Inbound* handler, as it reacts to channel writability changing,
35  * which in the Netty vocabulary is an inbound event. This has already changed in
36  * the Netty 5.0.0 API, where Handlers are unified.
37  */
38 final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
39     public interface MessageHolder<T> {
40         /**
41          * Take ownership of the encapsulated listener. Guaranteed to
42          * be called at most once.
43          *
44          * @return listener encapsulated in the holder, may be null
45          * @throws IllegalStateException if the listener is no longer
46          *         available (for example because it has already been
47          *         taken).
48          */
49         GenericFutureListener<Future<Void>> takeListener();
50
51         /**
52          * Take ownership of the encapsulated message. Guaranteed to be
53          * called at most once.
54          *
55          * @return message encapsulated in the holder, may not be null
56          * @throws IllegalStateException if the message is no longer
57          *         available (for example because it has already been
58          *         taken).
59          */
60         T takeMessage();
61     }
62
63     /**
64      * This is the default upper bound we place on the flush task running
65      * a single iteration. We relinquish control after about this amount
66      * of time.
67      */
68     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
69
70     /**
71      * We re-check the time spent flushing every this many messages. We do this because
72      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
73      * or similar to disable the feature.
74      */
75     private static final int WORKTIME_RECHECK_MSGS = 64;
76     private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
77
78     // Passed to executor to request triggering of flush
79     private final Runnable flushRunnable = ChannelOutboundQueue.this::flush;
80
81     /*
82      * Instead of using an AtomicBoolean object, we use these two. It saves us
83      * from allocating an extra object.
84      */
85     private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER =
86             AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
87     private volatile int flushScheduled = 0;
88
89     private final Queue<MessageHolder<?>> queue;
90     private final long maxWorkTime;
91     private final Channel channel;
92     private final InetSocketAddress address;
93
94     ChannelOutboundQueue(final Channel channel, final int queueDepth, final InetSocketAddress address) {
95         Preconditions.checkArgument(queueDepth > 0, "Queue depth has to be positive");
96
97         /*
98          * This looks like a good trade-off for throughput. Alternative is
99          * to use an ArrayBlockingQueue -- but that uses a single lock to
100          * synchronize both producers and consumers, potentially leading
101          * to less throughput.
102          */
103         this.queue = new LinkedBlockingQueue<>(queueDepth);
104         this.channel = Preconditions.checkNotNull(channel);
105         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
106         this.address = address;
107     }
108
109     /**
110      * Enqueue a message holder for transmission. Is a thread-safe entry point
111      * for the channel. If the cannot be placed on the queue, this
112      *
113      * @param holder MessageHolder which should be enqueue
114      * @return Success indicator, true if the enqueue operation succeeded,
115      *         false if the queue is full.
116      */
117     public boolean enqueue(final MessageHolder<?> holder) {
118         LOG.trace("Enqueuing message {}", holder);
119         if (queue.offer(holder)) {
120             LOG.trace("Message enqueued");
121             conditionalFlush();
122             return true;
123         }
124
125         LOG.debug("Message queue is full");
126         return false;
127     }
128
129     private void scheduleFlush(final EventExecutor executor) {
130         if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
131             LOG.trace("Scheduling flush task");
132             executor.execute(flushRunnable);
133         } else {
134             LOG.trace("Flush task is already present");
135         }
136     }
137
138     /**
139      * Schedule a queue flush if it is not empty and the channel is found
140      * to be writable.
141      */
142     private void conditionalFlush() {
143         if (queue.isEmpty()) {
144             LOG.trace("Queue is empty, flush not needed");
145             return;
146         }
147         if (!channel.isWritable()) {
148             LOG.trace("Channel {} is not writable, not issuing a flush", channel);
149             return;
150         }
151
152         scheduleFlush(channel.pipeline().lastContext().executor());
153     }
154
155     private void conditionalFlush(final ChannelHandlerContext ctx) {
156         Preconditions.checkState(ctx.channel().equals(channel),
157                 "Inconsistent channel %s with context %s", channel, ctx);
158         conditionalFlush();
159     }
160
161     /*
162      * The synchronized keyword should be unnecessary, really, but it enforces
163      * queue order should something go terribly wrong. It should be completely
164      * uncontended.
165      */
166     private synchronized void flush() {
167
168         final long start = System.nanoTime();
169         final long deadline = start + maxWorkTime;
170
171         LOG.debug("Dequeuing messages to channel {}", channel);
172
173         long messages = 0;
174         for (;; ++messages) {
175             if (!channel.isWritable()) {
176                 LOG.trace("Channel is no longer writable");
177                 break;
178             }
179
180             final MessageHolder<?> h = queue.poll();
181             if (h == null) {
182                 LOG.trace("The queue is completely drained");
183                 break;
184             }
185
186             final GenericFutureListener<Future<Void>> l = h.takeListener();
187
188             final ChannelFuture p;
189             if (address == null) {
190                 p = channel.write(new MessageListenerWrapper(h.takeMessage(), l));
191             } else {
192                 p = channel.write(new UdpMessageListenerWrapper(h.takeMessage(), l, address));
193             }
194             if (l != null) {
195                 p.addListener(l);
196             }
197
198             /*
199              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
200              *
201              * XXX: given we already measure our flushing throughput, we
202              *      should be able to perform dynamic adjustments here.
203              *      is that additional complexity needed, though?
204              */
205             if (messages % WORKTIME_RECHECK_MSGS == 0 && System.nanoTime() >= deadline) {
206                 LOG.trace("Exceeded allotted work time {}us",
207                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
208                 break;
209             }
210         }
211
212         if (messages > 0) {
213             LOG.debug("Flushing {} message(s) to channel {}", messages, channel);
214             channel.flush();
215         }
216
217         if (LOG.isDebugEnabled()) {
218             final long stop = System.nanoTime();
219             LOG.debug("Flushed {} messages in {}us to channel {}",
220                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), channel);
221         }
222
223         /*
224          * We are almost ready to terminate. This is a bit tricky, because
225          * we do not want to have a race window where a message would be
226          * stuck on the queue without a flush being scheduled.
227          *
228          * So we mark ourselves as not running and then re-check if a
229          * flush out is needed. That will re-synchronized with other threads
230          * such that only one flush is scheduled at any given time.
231          */
232         if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
233             LOG.warn("Channel {} queue {} flusher found unscheduled", channel, queue);
234         }
235
236         conditionalFlush();
237     }
238
239     @Override
240     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
241         super.channelActive(ctx);
242         conditionalFlush(ctx);
243     }
244
245     @Override
246     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
247         super.channelWritabilityChanged(ctx);
248         conditionalFlush(ctx);
249     }
250
251     @Override
252     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
253         super.channelInactive(ctx);
254
255         long entries = 0;
256         LOG.debug("Channel shutdown, flushing queue...");
257         final Future<Void> result = ctx.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
258         while (true) {
259             final MessageHolder<?> e = queue.poll();
260             if (e == null) {
261                 break;
262             }
263
264             e.takeListener().operationComplete(result);
265             entries++;
266         }
267
268         LOG.debug("Flushed {} queue entries", entries);
269     }
270
271     @Override
272     public String toString() {
273         return String.format("Channel %s queue [%s messages flushing=%s]", channel, queue.size(), flushScheduled);
274     }
275 }