BUG-3219: Fix flush task scheduling
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.Queue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
28     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
29
30     /**
31      * This is the default upper bound we place on the flush task running
32      * a single iteration. We relinquish control after about this amount
33      * of time.
34      */
35     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
36
37     /**
38      * We re-check the time spent flushing every this many messages. We do this because
39      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
40      * or similar to disable the feature.
41      */
42     private static final int WORKTIME_RECHECK_MSGS = 64;
43
44     /**
45      * We maintain a cache of this many previous queues for later reuse.
46      */
47     private static final int QUEUE_CACHE_SIZE = 4;
48
49     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
50     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
51     private final AtomicBoolean flushScheduled = new AtomicBoolean();
52     private final ConnectionAdapterImpl parent;
53     private final InetSocketAddress address;
54     private final long maxBarrierNanos;
55     private final long maxWorkTime;
56     private final int queueSize;
57     private final T handler;
58
59     // Updated from netty only
60     private long lastBarrierNanos = System.nanoTime();
61     private OutboundQueueImpl currentQueue;
62     private boolean barrierTimerEnabled;
63     private int nonBarrierMessages;
64     private long lastXid = 0;
65     private Integer shutdownOffset;
66
67     // Passed to executor to request triggering of flush
68     private final Runnable flushRunnable = new Runnable() {
69         @Override
70         public void run() {
71             flush();
72         }
73     };
74
75     // Passed to executor to request a periodic barrier check
76     private final Runnable barrierRunnable = new Runnable() {
77         @Override
78         public void run() {
79             barrier();
80         }
81     };
82
83     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
84         final int queueSize, final long maxBarrierNanos) {
85         this.parent = Preconditions.checkNotNull(parent);
86         this.handler = Preconditions.checkNotNull(handler);
87         Preconditions.checkArgument(queueSize > 0);
88         this.queueSize = queueSize;
89         Preconditions.checkArgument(maxBarrierNanos > 0);
90         this.maxBarrierNanos = maxBarrierNanos;
91         this.address = address;
92         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
93
94         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
95         createQueue();
96     }
97
98     T getHandler() {
99         return handler;
100     }
101
102     @Override
103     public void close() {
104         handler.onConnectionQueueChanged(null);
105     }
106
107     private void retireQueue(final OutboundQueueImpl queue) {
108         if (queueCache.offer(queue)) {
109             LOG.trace("Saving queue {} for later reuse", queue);
110         } else {
111             LOG.trace("Queue {} thrown away", queue);
112         }
113     }
114
115     private void createQueue() {
116         final long baseXid = lastXid;
117         lastXid += queueSize + 1;
118
119         final OutboundQueueImpl cached = queueCache.poll();
120         final OutboundQueueImpl queue;
121         if (cached != null) {
122             queue = cached.reuse(baseXid);
123             LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
124         } else {
125             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
126             LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
127         }
128
129         activeQueues.add(queue);
130         currentQueue = queue;
131         handler.onConnectionQueueChanged(queue);
132     }
133
134     private void scheduleBarrierTimer(final long now) {
135         long next = lastBarrierNanos + maxBarrierNanos;
136         if (next < now) {
137             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
138             next = now + maxBarrierNanos;
139         }
140
141         final long delay = next - now;
142         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
143         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
144         barrierTimerEnabled = true;
145     }
146
147     private void scheduleBarrierMessage() {
148         final Long xid = currentQueue.reserveBarrierIfNeeded();
149         if (xid == null) {
150             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
151             return;
152         }
153
154         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
155         LOG.trace("Barrier XID {} scheduled", xid);
156     }
157
158     /**
159      * Flush an entry from the queue.
160      *
161      * @param now Time reference for 'now'. We take this as an argument, as
162      *            we need a timestamp to mark barrier messages we see swinging
163      *            by. That timestamp does not need to be completely accurate,
164      *            hence we use the flush start time. Alternative would be to
165      *            measure System.nanoTime() for each barrier -- needlessly
166      *            adding overhead.
167      *
168      * @return Entry which was flushed, null if no entry is ready.
169      */
170     OfHeader flushEntry(final long now) {
171         final OfHeader message = currentQueue.flushEntry();
172         if (currentQueue.isFlushed()) {
173             LOG.debug("Queue {} is fully flushed", currentQueue);
174             createQueue();
175         }
176
177         if (message == null) {
178             return null;
179         }
180
181         if (message instanceof BarrierInput) {
182             LOG.trace("Barrier message seen, resetting counters");
183             nonBarrierMessages = 0;
184             lastBarrierNanos = now;
185         } else {
186             nonBarrierMessages++;
187             if (nonBarrierMessages >= queueSize) {
188                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
189                 scheduleBarrierMessage();
190             } else if (!barrierTimerEnabled) {
191                 scheduleBarrierTimer(now);
192             }
193         }
194
195         return message;
196     }
197
198     /**
199      * Invoked whenever a message comes in from the switch. Runs matching
200      * on all active queues in an attempt to complete a previous request.
201      *
202      * @param message Potential response message
203      * @return True if the message matched a previous request, false otherwise.
204      */
205     boolean onMessage(final OfHeader message) {
206         LOG.trace("Attempting to pair message {} to a request", message);
207
208         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
209         while (it.hasNext()) {
210             final OutboundQueueImpl queue = it.next();
211             final OutboundQueueEntry entry = queue.pairRequest(message);
212
213             if (entry == null) {
214                 continue;
215             }
216
217             LOG.trace("Queue {} accepted response {}", queue, message);
218
219             // This has been a barrier request, we need to flush all
220             // previous queues
221             if (entry.isBarrier() && activeQueues.size() > 1) {
222                 LOG.trace("Queue {} indicated request was a barrier", queue);
223
224                 it = activeQueues.iterator();
225                 while (it.hasNext()) {
226                     final OutboundQueueImpl q = it.next();
227
228                     // We want to complete all queues before the current one, we will
229                     // complete the current queue below
230                     if (!queue.equals(q)) {
231                         LOG.trace("Queue {} is implied finished", q);
232                         q.completeAll();
233                         it.remove();
234                         retireQueue(q);
235                     } else {
236                         break;
237                     }
238                 }
239             }
240
241             if (queue.isFinished()) {
242                 LOG.trace("Queue {} is finished", queue);
243                 it.remove();
244                 retireQueue(queue);
245             }
246
247             return true;
248         }
249
250         LOG.debug("Failed to find completion for message {}", message);
251         return false;
252     }
253
254     private void scheduleFlush() {
255         if (flushScheduled.compareAndSet(false, true)) {
256             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
257             parent.getChannel().eventLoop().execute(flushRunnable);
258         } else {
259             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
260         }
261     }
262
263     void ensureFlushing(final OutboundQueueImpl queue) {
264         Preconditions.checkState(currentQueue.equals(queue));
265         scheduleFlush();
266     }
267
268     /**
269      * Periodic barrier check.
270      */
271     protected void barrier() {
272         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
273         barrierTimerEnabled = false;
274         if (shutdownOffset != null) {
275             LOG.trace("Channel shut down, not processing barrier");
276             return;
277         }
278
279         final long now = System.nanoTime();
280         final long sinceLast = now - lastBarrierNanos;
281         if (sinceLast >= maxBarrierNanos) {
282             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
283             // FIXME: we should be tracking requests/responses instead of this
284             if (nonBarrierMessages == 0) {
285                 LOG.trace("No messages written since last barrier, not issuing one");
286             } else {
287                 scheduleBarrierMessage();
288             }
289         }
290     }
291
292     private void rescheduleFlush() {
293         /*
294          * We are almost ready to terminate. This is a bit tricky, because
295          * we do not want to have a race window where a message would be
296          * stuck on the queue without a flush being scheduled.
297          *
298          * So we mark ourselves as not running and then re-check if a
299          * flush out is needed. That will re-synchronized with other threads
300          * such that only one flush is scheduled at any given time.
301          */
302         if (!flushScheduled.compareAndSet(true, false)) {
303             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
304         }
305
306         conditionalFlush();
307     }
308
309     private void shutdownFlush() {
310         long entries = 0;
311
312         // Fail all queues
313         final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
314         while (it.hasNext()) {
315             final OutboundQueueImpl queue = it.next();
316
317             entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
318             if (queue.isFinished()) {
319                 LOG.trace("Cleared queue {}", queue);
320                 it.remove();
321             }
322         }
323
324         LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
325
326         Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
327         if (currentQueue.isShutdown(shutdownOffset)) {
328             currentQueue = null;
329             handler.onConnectionQueueChanged(null);
330             LOG.debug("Channel {} shutdown complete", parent.getChannel());
331         } else {
332             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
333             rescheduleFlush();
334         }
335     }
336
337     /**
338      * Perform a single flush operation.
339      */
340     protected void flush() {
341         // If the channel is gone, just flush whatever is not completed
342         if (shutdownOffset != null) {
343             shutdownFlush();
344             return;
345         }
346
347         final long start = System.nanoTime();
348         final long deadline = start + maxWorkTime;
349
350         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
351
352         long messages = 0;
353         for (;; ++messages) {
354             if (!parent.getChannel().isWritable()) {
355                 LOG.trace("Channel is no longer writable");
356                 break;
357             }
358
359             final OfHeader message = flushEntry(start);
360             if (message == null) {
361                 LOG.trace("The queue is completely drained");
362                 break;
363             }
364
365             final Object wrapper;
366             if (address == null) {
367                 wrapper = new MessageListenerWrapper(message, null);
368             } else {
369                 wrapper = new UdpMessageListenerWrapper(message, null, address);
370             }
371             parent.getChannel().write(wrapper);
372
373             /*
374              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
375              *
376              * XXX: given we already measure our flushing throughput, we
377              *      should be able to perform dynamic adjustments here.
378              *      is that additional complexity needed, though?
379              */
380             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
381                 LOG.trace("Exceeded allotted work time {}us",
382                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
383                 break;
384             }
385         }
386
387         if (messages > 0) {
388             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
389             parent.getChannel().flush();
390         }
391
392         final long stop = System.nanoTime();
393         LOG.debug("Flushed {} messages in {}us to channel {}",
394                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
395
396         rescheduleFlush();
397     }
398
399     /**
400      * Schedule a queue flush if it is not empty and the channel is found
401      * to be writable. May only be called from Netty context.
402      */
403     private void conditionalFlush() {
404         if (currentQueue.needsFlush()) {
405             scheduleFlush();
406         } else {
407             LOG.trace("Queue is empty, no flush needed");
408         }
409     }
410
411     private void conditionalFlush(final ChannelHandlerContext ctx) {
412         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
413         conditionalFlush();
414     }
415
416     @Override
417     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
418         super.channelActive(ctx);
419         conditionalFlush(ctx);
420     }
421
422     @Override
423     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
424         super.channelWritabilityChanged(ctx);
425         conditionalFlush(ctx);
426     }
427
428     @Override
429     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
430         super.channelInactive(ctx);
431
432         LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
433
434         /*
435          * We are dealing with a multi-threaded shutdown, as the user may still
436          * be reserving entries in the queue. We are executing in a netty thread,
437          * so neither flush nor barrier can be running, which is good news.
438          *
439          * We will eat up all the slots in the queue here and mark the offset first
440          * reserved offset and free up all the cached queues. We then schedule
441          * the flush task, which will deal with the rest of the shutdown process.
442          */
443         shutdownOffset = currentQueue.startShutdown();
444         queueCache.clear();
445         LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
446         scheduleFlush();
447     }
448
449     @Override
450     public String toString() {
451         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
452     }
453 }