BUG-3219: clear queue array before stashing
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.Queue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
31     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
32
33     /**
34      * This is the default upper bound we place on the flush task running
35      * a single iteration. We relinquish control after about this amount
36      * of time.
37      */
38     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
39
40     /**
41      * We re-check the time spent flushing every this many messages. We do this because
42      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
43      * or similar to disable the feature.
44      */
45     private static final int WORKTIME_RECHECK_MSGS = 64;
46
47     /**
48      * We maintain a cache of this many previous queues for later reuse.
49      */
50     private static final int QUEUE_CACHE_SIZE = 4;
51
52     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
53     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
54     private final AtomicBoolean flushScheduled = new AtomicBoolean();
55     private final ConnectionAdapterImpl parent;
56     private final InetSocketAddress address;
57     private final long maxBarrierNanos;
58     private final long maxWorkTime;
59     private final int queueSize;
60     private final T handler;
61
62     // Updated from netty only
63     private long lastBarrierNanos = System.nanoTime();
64     private OutboundQueueImpl currentQueue;
65     private boolean barrierTimerEnabled;
66     private int nonBarrierMessages;
67     private long lastXid = 0;
68     private Integer shutdownOffset;
69
70     // Passed to executor to request triggering of flush
71     private final Runnable flushRunnable = new Runnable() {
72         @Override
73         public void run() {
74             flush();
75         }
76     };
77
78     // Passed to executor to request a periodic barrier check
79     private final Runnable barrierRunnable = new Runnable() {
80         @Override
81         public void run() {
82             barrier();
83         }
84     };
85
86     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
87         final int queueSize, final long maxBarrierNanos) {
88         this.parent = Preconditions.checkNotNull(parent);
89         this.handler = Preconditions.checkNotNull(handler);
90         Preconditions.checkArgument(queueSize > 0);
91         this.queueSize = queueSize;
92         Preconditions.checkArgument(maxBarrierNanos > 0);
93         this.maxBarrierNanos = maxBarrierNanos;
94         this.address = address;
95         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
96
97         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
98         createQueue();
99     }
100
101     T getHandler() {
102         return handler;
103     }
104
105     @Override
106     public void close() {
107         handler.onConnectionQueueChanged(null);
108     }
109
110     private void retireQueue(final OutboundQueueImpl queue) {
111         if (queueCache.offer(queue)) {
112             queue.retire();
113             LOG.trace("Saving queue {} for later reuse", queue);
114         } else {
115             LOG.trace("Queue {} thrown away", queue);
116         }
117     }
118
119     private void createQueue() {
120         final long baseXid = lastXid;
121         lastXid += queueSize + 1;
122
123         final OutboundQueueImpl cached = queueCache.poll();
124         final OutboundQueueImpl queue;
125         if (cached != null) {
126             queue = cached.reuse(baseXid);
127             LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
128         } else {
129             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
130             LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
131         }
132
133         activeQueues.add(queue);
134         currentQueue = queue;
135         handler.onConnectionQueueChanged(queue);
136     }
137
138     private void scheduleBarrierTimer(final long now) {
139         long next = lastBarrierNanos + maxBarrierNanos;
140         if (next < now) {
141             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
142             next = now + maxBarrierNanos;
143         }
144
145         final long delay = next - now;
146         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
147         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
148         barrierTimerEnabled = true;
149     }
150
151     private void scheduleBarrierMessage() {
152         final Long xid = currentQueue.reserveBarrierIfNeeded();
153         if (xid == null) {
154             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
155             return;
156         }
157
158         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
159         LOG.trace("Barrier XID {} scheduled", xid);
160     }
161
162     /**
163      * Flush an entry from the queue.
164      *
165      * @param now Time reference for 'now'. We take this as an argument, as
166      *            we need a timestamp to mark barrier messages we see swinging
167      *            by. That timestamp does not need to be completely accurate,
168      *            hence we use the flush start time. Alternative would be to
169      *            measure System.nanoTime() for each barrier -- needlessly
170      *            adding overhead.
171      *
172      * @return Entry which was flushed, null if no entry is ready.
173      */
174     OfHeader flushEntry(final long now) {
175         final OfHeader message = currentQueue.flushEntry();
176         if (currentQueue.isFlushed()) {
177             LOG.debug("Queue {} is fully flushed", currentQueue);
178             createQueue();
179         }
180
181         if (message == null) {
182             return null;
183         }
184
185         if (message instanceof BarrierInput) {
186             LOG.trace("Barrier message seen, resetting counters");
187             nonBarrierMessages = 0;
188             lastBarrierNanos = now;
189         } else {
190             nonBarrierMessages++;
191             if (nonBarrierMessages >= queueSize) {
192                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
193                 scheduleBarrierMessage();
194             } else if (!barrierTimerEnabled) {
195                 scheduleBarrierTimer(now);
196             }
197         }
198
199         return message;
200     }
201
202     /**
203      * Invoked whenever a message comes in from the switch. Runs matching
204      * on all active queues in an attempt to complete a previous request.
205      *
206      * @param message Potential response message
207      * @return True if the message matched a previous request, false otherwise.
208      */
209     boolean onMessage(final OfHeader message) {
210         LOG.trace("Attempting to pair message {} to a request", message);
211
212         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
213         while (it.hasNext()) {
214             final OutboundQueueImpl queue = it.next();
215             final OutboundQueueEntry entry = queue.pairRequest(message);
216
217             if (entry == null) {
218                 continue;
219             }
220
221             LOG.trace("Queue {} accepted response {}", queue, message);
222
223             // This has been a barrier request, we need to flush all
224             // previous queues
225             if (entry.isBarrier() && activeQueues.size() > 1) {
226                 LOG.trace("Queue {} indicated request was a barrier", queue);
227
228                 it = activeQueues.iterator();
229                 while (it.hasNext()) {
230                     final OutboundQueueImpl q = it.next();
231
232                     // We want to complete all queues before the current one, we will
233                     // complete the current queue below
234                     if (!queue.equals(q)) {
235                         LOG.trace("Queue {} is implied finished", q);
236                         q.completeAll();
237                         it.remove();
238                         retireQueue(q);
239                     } else {
240                         break;
241                     }
242                 }
243             }
244
245             if (queue.isFinished()) {
246                 LOG.trace("Queue {} is finished", queue);
247                 it.remove();
248                 retireQueue(queue);
249             }
250
251             return true;
252         }
253
254         LOG.debug("Failed to find completion for message {}", message);
255         return false;
256     }
257
258     private void scheduleFlush() {
259         if (flushScheduled.compareAndSet(false, true)) {
260             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
261             parent.getChannel().eventLoop().execute(flushRunnable);
262         } else {
263             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
264         }
265     }
266
267     void ensureFlushing(final OutboundQueueImpl queue) {
268         Preconditions.checkState(currentQueue.equals(queue));
269         scheduleFlush();
270     }
271
272     /**
273      * Periodic barrier check.
274      */
275     protected void barrier() {
276         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
277         barrierTimerEnabled = false;
278         if (shutdownOffset != null) {
279             LOG.trace("Channel shut down, not processing barrier");
280             return;
281         }
282
283         final long now = System.nanoTime();
284         final long sinceLast = now - lastBarrierNanos;
285         if (sinceLast >= maxBarrierNanos) {
286             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
287             // FIXME: we should be tracking requests/responses instead of this
288             if (nonBarrierMessages == 0) {
289                 LOG.trace("No messages written since last barrier, not issuing one");
290             } else {
291                 scheduleBarrierMessage();
292             }
293         }
294     }
295
296     private void rescheduleFlush() {
297         /*
298          * We are almost ready to terminate. This is a bit tricky, because
299          * we do not want to have a race window where a message would be
300          * stuck on the queue without a flush being scheduled.
301          *
302          * So we mark ourselves as not running and then re-check if a
303          * flush out is needed. That will re-synchronized with other threads
304          * such that only one flush is scheduled at any given time.
305          */
306         if (!flushScheduled.compareAndSet(true, false)) {
307             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
308         }
309
310         conditionalFlush();
311     }
312
313     private void shutdownFlush() {
314         long entries = 0;
315
316         // Fail all queues
317         final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
318         while (it.hasNext()) {
319             final OutboundQueueImpl queue = it.next();
320
321             entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
322             if (queue.isFinished()) {
323                 LOG.trace("Cleared queue {}", queue);
324                 it.remove();
325             }
326         }
327
328         LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
329
330         Preconditions.checkNotNull(currentQueue, "Current queue should not be null yet");
331         if (currentQueue.isShutdown(shutdownOffset)) {
332             currentQueue = null;
333             handler.onConnectionQueueChanged(null);
334             LOG.debug("Channel {} shutdown complete", parent.getChannel());
335         } else {
336             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
337             rescheduleFlush();
338         }
339     }
340
341     /**
342      * Perform a single flush operation.
343      */
344     protected void flush() {
345         // If the channel is gone, just flush whatever is not completed
346         if (shutdownOffset != null) {
347             shutdownFlush();
348             return;
349         }
350
351         final long start = System.nanoTime();
352         final long deadline = start + maxWorkTime;
353
354         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
355
356         long messages = 0;
357         for (;; ++messages) {
358             if (!parent.getChannel().isWritable()) {
359                 LOG.trace("Channel is no longer writable");
360                 break;
361             }
362
363             final OfHeader message = flushEntry(start);
364             if (message == null) {
365                 LOG.trace("The queue is completely drained");
366                 break;
367             }
368
369             final Object wrapper;
370             if (address == null) {
371                 wrapper = new MessageListenerWrapper(message, null);
372             } else {
373                 wrapper = new UdpMessageListenerWrapper(message, null, address);
374             }
375             parent.getChannel().write(wrapper);
376
377             /*
378              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
379              *
380              * XXX: given we already measure our flushing throughput, we
381              *      should be able to perform dynamic adjustments here.
382              *      is that additional complexity needed, though?
383              */
384             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
385                 LOG.trace("Exceeded allotted work time {}us",
386                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
387                 break;
388             }
389         }
390
391         if (messages > 0) {
392             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
393             parent.getChannel().flush();
394         }
395
396         final long stop = System.nanoTime();
397         LOG.debug("Flushed {} messages in {}us to channel {}",
398                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
399
400         rescheduleFlush();
401     }
402
403     /**
404      * Schedule a queue flush if it is not empty and the channel is found
405      * to be writable. May only be called from Netty context.
406      */
407     private void conditionalFlush() {
408         if (currentQueue.needsFlush() && (shutdownOffset != null || parent.getChannel().isWritable())) {
409             scheduleFlush();
410         } else {
411             LOG.trace("Queue is empty, no flush needed");
412         }
413     }
414
415     private void conditionalFlush(final ChannelHandlerContext ctx) {
416         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
417         conditionalFlush();
418     }
419
420     @Override
421     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
422         super.channelActive(ctx);
423         conditionalFlush(ctx);
424     }
425
426     @Override
427     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
428         super.channelWritabilityChanged(ctx);
429         conditionalFlush(ctx);
430     }
431
432     @Override
433     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
434         super.channelInactive(ctx);
435
436         LOG.debug("Channel {} initiating shutdown...", parent.getChannel());
437
438         /*
439          * We are dealing with a multi-threaded shutdown, as the user may still
440          * be reserving entries in the queue. We are executing in a netty thread,
441          * so neither flush nor barrier can be running, which is good news.
442          *
443          * We will eat up all the slots in the queue here and mark the offset first
444          * reserved offset and free up all the cached queues. We then schedule
445          * the flush task, which will deal with the rest of the shutdown process.
446          */
447         shutdownOffset = currentQueue.startShutdown();
448         queueCache.clear();
449         LOG.trace("Channel {} reserved all entries at offset {}", parent.getChannel(), shutdownOffset);
450         scheduleFlush();
451     }
452
453     @Override
454     public String toString() {
455         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
456     }
457
458     void onEchoRequest(final EchoRequestMessage message) {
459         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
460         parent.getChannel().writeAndFlush(reply);
461     }
462 }