Tune write low/highwatermark
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
30     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
31
32     /**
33      * This is the default upper bound we place on the flush task running
34      * a single iteration. We relinquish control after about this amount
35      * of time.
36      */
37     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
38
39     /**
40      * We re-check the time spent flushing every this many messages. We do this because
41      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
42      * or similar to disable the feature.
43      */
44     private static final int WORKTIME_RECHECK_MSGS = 64;
45
46     /**
47      * Default low write watermark. Channel will become writable when number of outstanding
48      * bytes dips below this value.
49      */
50     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
51
52     /**
53      * Default write high watermark. Channel will become un-writable when number of
54      * outstanding bytes hits this value.
55      */
56     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
57
58
59     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
60     private final AtomicBoolean flushScheduled = new AtomicBoolean();
61     private final ConnectionAdapterImpl parent;
62     private final InetSocketAddress address;
63     private final long maxBarrierNanos;
64     private final long maxWorkTime;
65     private final T handler;
66
67     // Updated from netty only
68     private long lastBarrierNanos = System.nanoTime();
69     private OutboundQueueCacheSlice slice;
70     private OutboundQueueImpl currentQueue;
71     private boolean barrierTimerEnabled;
72     private int nonBarrierMessages;
73     private long lastXid = 0;
74     private Integer shutdownOffset;
75
76     // Passed to executor to request triggering of flush
77     private final Runnable flushRunnable = new Runnable() {
78         @Override
79         public void run() {
80             flush();
81         }
82     };
83
84     // Passed to executor to request a periodic barrier check
85     private final Runnable barrierRunnable = new Runnable() {
86         @Override
87         public void run() {
88             barrier();
89         }
90     };
91
92     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
93         final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
94         this.parent = Preconditions.checkNotNull(parent);
95         this.handler = Preconditions.checkNotNull(handler);
96         this.slice = Preconditions.checkNotNull(slice);
97         Preconditions.checkArgument(maxBarrierNanos > 0);
98         this.maxBarrierNanos = maxBarrierNanos;
99         this.address = address;
100         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
101
102         LOG.debug("Queue manager instantiated with queue slice {}", slice);
103         createQueue();
104     }
105
106     T getHandler() {
107         return handler;
108     }
109
110     @Override
111     public void close() {
112         handler.onConnectionQueueChanged(null);
113         if (slice != null) {
114             slice.decRef();
115             slice = null;
116         }
117     }
118
119     private void createQueue() {
120         final long baseXid = lastXid;
121         lastXid += slice.getQueueSize() + 1;
122
123         final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
124         activeQueues.add(queue);
125         currentQueue = queue;
126         handler.onConnectionQueueChanged(queue);
127     }
128
129     private void scheduleBarrierTimer(final long now) {
130         long next = lastBarrierNanos + maxBarrierNanos;
131         if (next < now) {
132             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
133             next = now + maxBarrierNanos;
134         }
135
136         final long delay = next - now;
137         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
138         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
139         barrierTimerEnabled = true;
140     }
141
142     private void scheduleBarrierMessage() {
143         final Long xid = currentQueue.reserveBarrierIfNeeded();
144         if (xid == null) {
145             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
146             return;
147         }
148
149         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
150         LOG.trace("Barrier XID {} scheduled", xid);
151     }
152
153     /**
154      * Flush an entry from the queue.
155      *
156      * @param now Time reference for 'now'. We take this as an argument, as
157      *            we need a timestamp to mark barrier messages we see swinging
158      *            by. That timestamp does not need to be completely accurate,
159      *            hence we use the flush start time. Alternative would be to
160      *            measure System.nanoTime() for each barrier -- needlessly
161      *            adding overhead.
162      *
163      * @return Entry which was flushed, null if no entry is ready.
164      */
165     OfHeader flushEntry(final long now) {
166         final OfHeader message = currentQueue.flushEntry();
167         if (currentQueue.isFlushed()) {
168             LOG.debug("Queue {} is fully flushed", currentQueue);
169             createQueue();
170         }
171
172         if (message == null) {
173             return null;
174         }
175
176         if (message instanceof BarrierInput) {
177             LOG.trace("Barrier message seen, resetting counters");
178             nonBarrierMessages = 0;
179             lastBarrierNanos = now;
180         } else {
181             nonBarrierMessages++;
182             if (nonBarrierMessages >= slice.getQueueSize()) {
183                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
184                 scheduleBarrierMessage();
185             } else if (!barrierTimerEnabled) {
186                 scheduleBarrierTimer(now);
187             }
188         }
189
190         return message;
191     }
192
193     /**
194      * Invoked whenever a message comes in from the switch. Runs matching
195      * on all active queues in an attempt to complete a previous request.
196      *
197      * @param message Potential response message
198      * @return True if the message matched a previous request, false otherwise.
199      */
200     boolean onMessage(final OfHeader message) {
201         LOG.trace("Attempting to pair message {} to a request", message);
202
203         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
204         while (it.hasNext()) {
205             final OutboundQueueImpl queue = it.next();
206             final OutboundQueueEntry entry = queue.pairRequest(message);
207
208             if (entry == null) {
209                 continue;
210             }
211
212             LOG.trace("Queue {} accepted response {}", queue, message);
213
214             // This has been a barrier request, we need to flush all
215             // previous queues
216             if (entry.isBarrier() && activeQueues.size() > 1) {
217                 LOG.trace("Queue {} indicated request was a barrier", queue);
218
219                 it = activeQueues.iterator();
220                 while (it.hasNext()) {
221                     final OutboundQueueImpl q = it.next();
222
223                     // We want to complete all queues before the current one, we will
224                     // complete the current queue below
225                     if (!queue.equals(q)) {
226                         LOG.trace("Queue {} is implied finished", q);
227                         q.completeAll();
228                         it.remove();
229                         slice.putQueue(q);
230                     } else {
231                         break;
232                     }
233                 }
234             }
235
236             if (queue.isFinished()) {
237                 LOG.trace("Queue {} is finished", queue);
238                 it.remove();
239                 slice.putQueue(queue);
240             }
241
242             return true;
243         }
244
245         LOG.debug("Failed to find completion for message {}", message);
246         return false;
247     }
248
249     private void scheduleFlush() {
250         if (flushScheduled.compareAndSet(false, true)) {
251             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
252             parent.getChannel().eventLoop().execute(flushRunnable);
253         } else {
254             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
255         }
256     }
257
258     void ensureFlushing(final OutboundQueueImpl queue) {
259         Preconditions.checkState(currentQueue.equals(queue));
260         scheduleFlush();
261     }
262
263     /**
264      * Periodic barrier check.
265      */
266     protected void barrier() {
267         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
268         barrierTimerEnabled = false;
269         if (shutdownOffset != null) {
270             LOG.trace("Channel shut down, not processing barrier");
271             return;
272         }
273
274         final long now = System.nanoTime();
275         final long sinceLast = now - lastBarrierNanos;
276         if (sinceLast >= maxBarrierNanos) {
277             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
278             // FIXME: we should be tracking requests/responses instead of this
279             if (nonBarrierMessages == 0) {
280                 LOG.trace("No messages written since last barrier, not issuing one");
281             } else {
282                 scheduleBarrierMessage();
283             }
284         }
285     }
286
287     private void rescheduleFlush() {
288         /*
289          * We are almost ready to terminate. This is a bit tricky, because
290          * we do not want to have a race window where a message would be
291          * stuck on the queue without a flush being scheduled.
292          *
293          * So we mark ourselves as not running and then re-check if a
294          * flush out is needed. That will re-synchronized with other threads
295          * such that only one flush is scheduled at any given time.
296          */
297         if (!flushScheduled.compareAndSet(true, false)) {
298             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
299         }
300
301         conditionalFlush();
302     }
303
304     private void shutdownFlush() {
305         long entries = 0;
306
307         // Fail all queues
308         final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
309         while (it.hasNext()) {
310             final OutboundQueueImpl queue = it.next();
311
312             entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
313             if (queue.isFinished()) {
314                 LOG.trace("Cleared queue {}", queue);
315                 it.remove();
316             }
317         }
318
319         LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
320
321         Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
322         if (currentQueue.isShutdown(shutdownOffset)) {
323             currentQueue = null;
324             handler.onConnectionQueueChanged(null);
325             LOG.debug("Channel {} shutdown complete", parent.getChannel());
326         } else {
327             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
328             rescheduleFlush();
329         }
330     }
331
332     /**
333      * Perform a single flush operation. We keep it here so we do not generate
334      * syntetic accessors for private fields. Otherwise it could be moved into
335      * {@link #flushRunnable}.
336      */
337     protected void flush() {
338         // If the channel is gone, just flush whatever is not completed
339         if (shutdownOffset != null) {
340             shutdownFlush();
341             return;
342         }
343
344         final long start = System.nanoTime();
345         final long deadline = start + maxWorkTime;
346
347         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
348
349         long messages = 0;
350         for (;; ++messages) {
351             if (!parent.getChannel().isWritable()) {
352                 LOG.debug("Channel {} is no longer writable", parent.getChannel());
353                 break;
354             }
355
356             final OfHeader message = flushEntry(start);
357             if (message == null) {
358                 LOG.trace("The queue is completely drained");
359                 break;
360             }
361
362             final Object wrapper;
363             if (address == null) {
364                 wrapper = new MessageListenerWrapper(message, null);
365             } else {
366                 wrapper = new UdpMessageListenerWrapper(message, null, address);
367             }
368             parent.getChannel().write(wrapper);
369
370             /*
371              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
372              *
373              * XXX: given we already measure our flushing throughput, we
374              *      should be able to perform dynamic adjustments here.
375              *      is that additional complexity needed, though?
376              */
377             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
378                 LOG.trace("Exceeded allotted work time {}us",
379                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
380                 break;
381             }
382         }
383
384         if (messages > 0) {
385             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
386             parent.getChannel().flush();
387         }
388
389         final long stop = System.nanoTime();
390         LOG.debug("Flushed {} messages in {}us to channel {}",
391                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
392
393         rescheduleFlush();
394     }
395
396     /**
397      * Schedule a queue flush if it is not empty and the channel is found
398      * to be writable. May only be called from Netty context.
399      */
400     private void conditionalFlush() {
401         if (currentQueue.needsFlush()) {
402             if (shutdownOffset != null || parent.getChannel().isWritable()) {
403                 scheduleFlush();
404             } else {
405                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
406             }
407         } else {
408             LOG.trace("Queue is empty, no flush needed");
409         }
410     }
411
412     private void conditionalFlush(final ChannelHandlerContext ctx) {
413         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
414         conditionalFlush();
415     }
416
417     @Override
418     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
419         super.channelActive(ctx);
420         conditionalFlush(ctx);
421     }
422
423     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
424         /*
425          * Tune channel write buffering. We increase the writability window
426          * to ensure we can flush an entire queue segment in one go. We definitely
427          * want to keep the difference above 64k, as that will ensure we use jam-packed
428          * TCP packets. UDP will fragment as appropriate.
429          */
430         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
431         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
432
433         super.handlerAdded(ctx);
434     }
435
436     @Override
437     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
438         super.channelWritabilityChanged(ctx);
439
440         if (flushScheduled.compareAndSet(false, true)) {
441             LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
442             flush();
443         } else {
444             LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
445         }
446     }
447
448     @Override
449     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
450         super.channelInactive(ctx);
451
452         LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
453
454         /*
455          * We are dealing with a multi-threaded shutdown, as the user may still
456          * be reserving entries in the queue. We are executing in a netty thread,
457          * so neither flush nor barrier can be running, which is good news.
458          *
459          * We will eat up all the slots in the queue here and mark the offset first
460          * reserved offset and free up all the cached queues. We then schedule
461          * the flush task, which will deal with the rest of the shutdown process.
462          */
463         shutdownOffset = currentQueue.startShutdown();
464         if (slice != null) {
465             slice.decRef();
466             slice = null;
467         }
468
469         LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
470         scheduleFlush();
471     }
472
473     @Override
474     public String toString() {
475         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
476     }
477
478     void onEchoRequest(final EchoRequestMessage message) {
479         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
480         parent.getChannel().writeAndFlush(reply);
481     }
482 }