Run flush immediately when channel becomes writable
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.Iterator;
15 import java.util.LinkedList;
16 import java.util.Queue;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
30     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
31
32     /**
33      * This is the default upper bound we place on the flush task running
34      * a single iteration. We relinquish control after about this amount
35      * of time.
36      */
37     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
38
39     /**
40      * We re-check the time spent flushing every this many messages. We do this because
41      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
42      * or similar to disable the feature.
43      */
44     private static final int WORKTIME_RECHECK_MSGS = 64;
45
46     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
47     private final AtomicBoolean flushScheduled = new AtomicBoolean();
48     private final ConnectionAdapterImpl parent;
49     private final InetSocketAddress address;
50     private final long maxBarrierNanos;
51     private final long maxWorkTime;
52     private final T handler;
53
54     // Updated from netty only
55     private long lastBarrierNanos = System.nanoTime();
56     private OutboundQueueCacheSlice slice;
57     private OutboundQueueImpl currentQueue;
58     private boolean barrierTimerEnabled;
59     private int nonBarrierMessages;
60     private long lastXid = 0;
61     private Integer shutdownOffset;
62
63     // Passed to executor to request triggering of flush
64     private final Runnable flushRunnable = new Runnable() {
65         @Override
66         public void run() {
67             flush();
68         }
69     };
70
71     // Passed to executor to request a periodic barrier check
72     private final Runnable barrierRunnable = new Runnable() {
73         @Override
74         public void run() {
75             barrier();
76         }
77     };
78
79     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
80         final OutboundQueueCacheSlice slice, final long maxBarrierNanos) {
81         this.parent = Preconditions.checkNotNull(parent);
82         this.handler = Preconditions.checkNotNull(handler);
83         this.slice = Preconditions.checkNotNull(slice);
84         Preconditions.checkArgument(maxBarrierNanos > 0);
85         this.maxBarrierNanos = maxBarrierNanos;
86         this.address = address;
87         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
88
89         LOG.debug("Queue manager instantiated with queue slice {}", slice);
90         createQueue();
91     }
92
93     T getHandler() {
94         return handler;
95     }
96
97     @Override
98     public void close() {
99         handler.onConnectionQueueChanged(null);
100         if (slice != null) {
101             slice.decRef();
102             slice = null;
103         }
104     }
105
106     private void createQueue() {
107         final long baseXid = lastXid;
108         lastXid += slice.getQueueSize() + 1;
109
110         final OutboundQueueImpl queue = slice.getQueue(this, baseXid);
111         activeQueues.add(queue);
112         currentQueue = queue;
113         handler.onConnectionQueueChanged(queue);
114     }
115
116     private void scheduleBarrierTimer(final long now) {
117         long next = lastBarrierNanos + maxBarrierNanos;
118         if (next < now) {
119             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
120             next = now + maxBarrierNanos;
121         }
122
123         final long delay = next - now;
124         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
125         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
126         barrierTimerEnabled = true;
127     }
128
129     private void scheduleBarrierMessage() {
130         final Long xid = currentQueue.reserveBarrierIfNeeded();
131         if (xid == null) {
132             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
133             return;
134         }
135
136         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
137         LOG.trace("Barrier XID {} scheduled", xid);
138     }
139
140     /**
141      * Flush an entry from the queue.
142      *
143      * @param now Time reference for 'now'. We take this as an argument, as
144      *            we need a timestamp to mark barrier messages we see swinging
145      *            by. That timestamp does not need to be completely accurate,
146      *            hence we use the flush start time. Alternative would be to
147      *            measure System.nanoTime() for each barrier -- needlessly
148      *            adding overhead.
149      *
150      * @return Entry which was flushed, null if no entry is ready.
151      */
152     OfHeader flushEntry(final long now) {
153         final OfHeader message = currentQueue.flushEntry();
154         if (currentQueue.isFlushed()) {
155             LOG.debug("Queue {} is fully flushed", currentQueue);
156             createQueue();
157         }
158
159         if (message == null) {
160             return null;
161         }
162
163         if (message instanceof BarrierInput) {
164             LOG.trace("Barrier message seen, resetting counters");
165             nonBarrierMessages = 0;
166             lastBarrierNanos = now;
167         } else {
168             nonBarrierMessages++;
169             if (nonBarrierMessages >= slice.getQueueSize()) {
170                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
171                 scheduleBarrierMessage();
172             } else if (!barrierTimerEnabled) {
173                 scheduleBarrierTimer(now);
174             }
175         }
176
177         return message;
178     }
179
180     /**
181      * Invoked whenever a message comes in from the switch. Runs matching
182      * on all active queues in an attempt to complete a previous request.
183      *
184      * @param message Potential response message
185      * @return True if the message matched a previous request, false otherwise.
186      */
187     boolean onMessage(final OfHeader message) {
188         LOG.trace("Attempting to pair message {} to a request", message);
189
190         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
191         while (it.hasNext()) {
192             final OutboundQueueImpl queue = it.next();
193             final OutboundQueueEntry entry = queue.pairRequest(message);
194
195             if (entry == null) {
196                 continue;
197             }
198
199             LOG.trace("Queue {} accepted response {}", queue, message);
200
201             // This has been a barrier request, we need to flush all
202             // previous queues
203             if (entry.isBarrier() && activeQueues.size() > 1) {
204                 LOG.trace("Queue {} indicated request was a barrier", queue);
205
206                 it = activeQueues.iterator();
207                 while (it.hasNext()) {
208                     final OutboundQueueImpl q = it.next();
209
210                     // We want to complete all queues before the current one, we will
211                     // complete the current queue below
212                     if (!queue.equals(q)) {
213                         LOG.trace("Queue {} is implied finished", q);
214                         q.completeAll();
215                         it.remove();
216                         slice.putQueue(q);
217                     } else {
218                         break;
219                     }
220                 }
221             }
222
223             if (queue.isFinished()) {
224                 LOG.trace("Queue {} is finished", queue);
225                 it.remove();
226                 slice.putQueue(queue);
227             }
228
229             return true;
230         }
231
232         LOG.debug("Failed to find completion for message {}", message);
233         return false;
234     }
235
236     private void scheduleFlush() {
237         if (flushScheduled.compareAndSet(false, true)) {
238             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
239             parent.getChannel().eventLoop().execute(flushRunnable);
240         } else {
241             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
242         }
243     }
244
245     void ensureFlushing(final OutboundQueueImpl queue) {
246         Preconditions.checkState(currentQueue.equals(queue));
247         scheduleFlush();
248     }
249
250     /**
251      * Periodic barrier check.
252      */
253     protected void barrier() {
254         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
255         barrierTimerEnabled = false;
256         if (shutdownOffset != null) {
257             LOG.trace("Channel shut down, not processing barrier");
258             return;
259         }
260
261         final long now = System.nanoTime();
262         final long sinceLast = now - lastBarrierNanos;
263         if (sinceLast >= maxBarrierNanos) {
264             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
265             // FIXME: we should be tracking requests/responses instead of this
266             if (nonBarrierMessages == 0) {
267                 LOG.trace("No messages written since last barrier, not issuing one");
268             } else {
269                 scheduleBarrierMessage();
270             }
271         }
272     }
273
274     private void rescheduleFlush() {
275         /*
276          * We are almost ready to terminate. This is a bit tricky, because
277          * we do not want to have a race window where a message would be
278          * stuck on the queue without a flush being scheduled.
279          *
280          * So we mark ourselves as not running and then re-check if a
281          * flush out is needed. That will re-synchronized with other threads
282          * such that only one flush is scheduled at any given time.
283          */
284         if (!flushScheduled.compareAndSet(true, false)) {
285             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
286         }
287
288         conditionalFlush();
289     }
290
291     private void shutdownFlush() {
292         long entries = 0;
293
294         // Fail all queues
295         final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
296         while (it.hasNext()) {
297             final OutboundQueueImpl queue = it.next();
298
299             entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
300             if (queue.isFinished()) {
301                 LOG.trace("Cleared queue {}", queue);
302                 it.remove();
303             }
304         }
305
306         LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
307
308         Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
309         if (currentQueue.isShutdown(shutdownOffset)) {
310             currentQueue = null;
311             handler.onConnectionQueueChanged(null);
312             LOG.debug("Channel {} shutdown complete", parent.getChannel());
313         } else {
314             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
315             rescheduleFlush();
316         }
317     }
318
319     /**
320      * Perform a single flush operation.
321      */
322     protected void flush() {
323         // If the channel is gone, just flush whatever is not completed
324         if (shutdownOffset != null) {
325             shutdownFlush();
326             return;
327         }
328
329         final long start = System.nanoTime();
330         final long deadline = start + maxWorkTime;
331
332         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
333
334         long messages = 0;
335         for (;; ++messages) {
336             if (!parent.getChannel().isWritable()) {
337                 LOG.debug("Channel {} is no longer writable", parent.getChannel());
338                 break;
339             }
340
341             final OfHeader message = flushEntry(start);
342             if (message == null) {
343                 LOG.trace("The queue is completely drained");
344                 break;
345             }
346
347             final Object wrapper;
348             if (address == null) {
349                 wrapper = new MessageListenerWrapper(message, null);
350             } else {
351                 wrapper = new UdpMessageListenerWrapper(message, null, address);
352             }
353             parent.getChannel().write(wrapper);
354
355             /*
356              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
357              *
358              * XXX: given we already measure our flushing throughput, we
359              *      should be able to perform dynamic adjustments here.
360              *      is that additional complexity needed, though?
361              */
362             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
363                 LOG.trace("Exceeded allotted work time {}us",
364                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
365                 break;
366             }
367         }
368
369         if (messages > 0) {
370             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
371             parent.getChannel().flush();
372         }
373
374         final long stop = System.nanoTime();
375         LOG.debug("Flushed {} messages in {}us to channel {}",
376                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
377
378         rescheduleFlush();
379     }
380
381     /**
382      * Schedule a queue flush if it is not empty and the channel is found
383      * to be writable. May only be called from Netty context.
384      */
385     private void conditionalFlush() {
386         if (currentQueue.needsFlush()) {
387             if (shutdownOffset != null || parent.getChannel().isWritable()) {
388                 scheduleFlush();
389             } else {
390                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
391             }
392         } else {
393             LOG.trace("Queue is empty, no flush needed");
394         }
395     }
396
397     private void conditionalFlush(final ChannelHandlerContext ctx) {
398         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
399         conditionalFlush();
400     }
401
402     @Override
403     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
404         super.channelActive(ctx);
405         conditionalFlush(ctx);
406     }
407
408     @Override
409     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
410         super.channelWritabilityChanged(ctx);
411
412         if (flushScheduled.compareAndSet(false, true)) {
413             LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
414             flush();
415         } else {
416             LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
417         }
418     }
419
420     @Override
421     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
422         super.channelInactive(ctx);
423
424         LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
425
426         /*
427          * We are dealing with a multi-threaded shutdown, as the user may still
428          * be reserving entries in the queue. We are executing in a netty thread,
429          * so neither flush nor barrier can be running, which is good news.
430          *
431          * We will eat up all the slots in the queue here and mark the offset first
432          * reserved offset and free up all the cached queues. We then schedule
433          * the flush task, which will deal with the rest of the shutdown process.
434          */
435         shutdownOffset = currentQueue.startShutdown();
436         if (slice != null) {
437             slice.decRef();
438             slice = null;
439         }
440
441         LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
442         scheduleFlush();
443     }
444
445     @Override
446     public String toString() {
447         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
448     }
449
450     void onEchoRequest(final EchoRequestMessage message) {
451         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
452         parent.getChannel().writeAndFlush(reply);
453     }
454 }