Cleanup queue logging
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.Queue;
18 import java.util.concurrent.RejectedExecutionException;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
28     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
29
30     /**
31      * This is the default upper bound we place on the flush task running
32      * a single iteration. We relinquish control after about this amount
33      * of time.
34      */
35     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
36
37     /**
38      * We re-check the time spent flushing every this many messages. We do this because
39      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
40      * or similar to disable the feature.
41      */
42     private static final int WORKTIME_RECHECK_MSGS = 64;
43
44     /**
45      * We maintain a cache of this many previous queues for later reuse.
46      */
47     private static final int QUEUE_CACHE_SIZE = 4;
48
49     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
50     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
51     private final AtomicBoolean flushScheduled = new AtomicBoolean();
52     private final ConnectionAdapterImpl parent;
53     private final InetSocketAddress address;
54     private final long maxBarrierNanos;
55     private final long maxWorkTime;
56     private final int queueSize;
57     private final T handler;
58
59     // Updated from netty only
60     private long lastBarrierNanos = System.nanoTime();
61     private OutboundQueueImpl currentQueue;
62     private boolean barrierTimerEnabled;
63     private int nonBarrierMessages;
64     private long lastXid = 0;
65
66     // Passed to executor to request triggering of flush
67     private final Runnable flushRunnable = new Runnable() {
68         @Override
69         public void run() {
70             flush();
71         }
72     };
73     private final Runnable barrierRunnable = new Runnable() {
74         @Override
75         public void run() {
76             barrier();
77         }
78     };
79
80     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
81         final int queueSize, final long maxBarrierNanos) {
82         this.parent = Preconditions.checkNotNull(parent);
83         this.handler = Preconditions.checkNotNull(handler);
84         Preconditions.checkArgument(queueSize > 0);
85         this.queueSize = queueSize;
86         Preconditions.checkArgument(maxBarrierNanos > 0);
87         this.maxBarrierNanos = maxBarrierNanos;
88         this.address = address;
89         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
90
91         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
92         createQueue();
93     }
94
95     T getHandler() {
96         return handler;
97     }
98
99     @Override
100     public void close() {
101         handler.onConnectionQueueChanged(null);
102     }
103
104     private void retireQueue(final OutboundQueueImpl queue) {
105         if (queueCache.offer(queue)) {
106             LOG.trace("Saving queue {} for later reuse", queue);
107         } else {
108             LOG.trace("Queue {} thrown away", queue);
109         }
110     }
111
112     private void createQueue() {
113         final long baseXid = lastXid;
114         lastXid += queueSize + 1;
115
116         final OutboundQueueImpl cached = queueCache.poll();
117         final OutboundQueueImpl queue;
118         if (cached != null) {
119             queue = cached.reuse(baseXid);
120             LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
121         } else {
122             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
123             LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
124         }
125
126         activeQueues.add(queue);
127         currentQueue = queue;
128         handler.onConnectionQueueChanged(queue);
129     }
130
131     private void scheduleBarrierTimer(final long now) {
132         long next = lastBarrierNanos + maxBarrierNanos;
133         if (next < now) {
134             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
135             next = now + maxBarrierNanos;
136         }
137
138         final long delay = next - now;
139         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
140         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
141         barrierTimerEnabled = true;
142     }
143
144     private void scheduleBarrierMessage() {
145         final Long xid = currentQueue.reserveBarrierIfNeeded();
146         if (xid == null) {
147             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
148             return;
149         }
150
151         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
152         LOG.trace("Barrier XID {} scheduled", xid);
153     }
154
155     /**
156      * Flush an entry from the queue.
157      *
158      * @param now Time reference for 'now'. We take this as an argument, as
159      *            we need a timestamp to mark barrier messages we see swinging
160      *            by. That timestamp does not need to be completely accurate,
161      *            hence we use the flush start time. Alternative would be to
162      *            measure System.nanoTime() for each barrier -- needlessly
163      *            adding overhead.
164      *
165      * @return Entry which was flushed, null if no entry is ready.
166      */
167     OfHeader flushEntry(final long now) {
168         final OfHeader message = currentQueue.flushEntry();
169         if (currentQueue.isFlushed()) {
170             LOG.debug("Queue {} is fully flushed", currentQueue);
171             createQueue();
172         }
173
174         if (message == null) {
175             return null;
176         }
177
178         if (message instanceof BarrierInput) {
179             LOG.trace("Barrier message seen, resetting counters");
180             nonBarrierMessages = 0;
181             lastBarrierNanos = now;
182         } else {
183             nonBarrierMessages++;
184             if (nonBarrierMessages >= queueSize) {
185                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
186                 scheduleBarrierMessage();
187             } else if (!barrierTimerEnabled) {
188                 scheduleBarrierTimer(now);
189             }
190         }
191
192         return message;
193     }
194
195     /**
196      * Invoked whenever a message comes in from the switch. Runs matching
197      * on all active queues in an attempt to complete a previous request.
198      *
199      * @param message Potential response message
200      * @return True if the message matched a previous request, false otherwise.
201      */
202     boolean onMessage(final OfHeader message) {
203         LOG.trace("Attempting to pair message {} to a request", message);
204
205         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
206         while (it.hasNext()) {
207             final OutboundQueueImpl queue = it.next();
208             final OutboundQueueEntry entry = queue.pairRequest(message);
209
210             if (entry == null) {
211                 continue;
212             }
213
214             LOG.trace("Queue {} accepted response {}", queue, message);
215
216             // This has been a barrier request, we need to flush all
217             // previous queues
218             if (entry.isBarrier() && activeQueues.size() > 1) {
219                 LOG.trace("Queue {} indicated request was a barrier", queue);
220
221                 it = activeQueues.iterator();
222                 while (it.hasNext()) {
223                     final OutboundQueueImpl q = it.next();
224
225                     // We want to complete all queues before the current one, we will
226                     // complete the current queue below
227                     if (!queue.equals(q)) {
228                         LOG.trace("Queue {} is implied finished", q);
229                         q.completeAll();
230                         it.remove();
231                         retireQueue(q);
232                     } else {
233                         break;
234                     }
235                 }
236             }
237
238             if (queue.isFinished()) {
239                 LOG.trace("Queue {} is finished", queue);
240                 it.remove();
241                 retireQueue(queue);
242             }
243
244             return true;
245         }
246
247         LOG.debug("Failed to find completion for message {}", message);
248         return false;
249     }
250
251     private void scheduleFlush() {
252         if (parent.getChannel().isWritable()) {
253             if (flushScheduled.compareAndSet(false, true)) {
254                 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
255                 parent.getChannel().eventLoop().execute(flushRunnable);
256             } else {
257                 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
258             }
259         } else {
260             LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
261         }
262     }
263
264     void ensureFlushing(final OutboundQueueImpl queue) {
265         Preconditions.checkState(currentQueue.equals(queue));
266         scheduleFlush();
267     }
268
269     /**
270      * Periodic barrier check.
271      */
272     protected void barrier() {
273         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
274         barrierTimerEnabled = false;
275         if (currentQueue == null) {
276             LOG.trace("Channel shut down, not processing barrier");
277             return;
278         }
279
280         final long now = System.nanoTime();
281         final long sinceLast = now - lastBarrierNanos;
282         if (sinceLast >= maxBarrierNanos) {
283             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
284             // FIXME: we should be tracking requests/responses instead of this
285             if (nonBarrierMessages == 0) {
286                 LOG.trace("No messages written since last barrier, not issuing one");
287             } else {
288                 scheduleBarrierMessage();
289             }
290         }
291     }
292
293     /**
294      * Perform a single flush operation.
295      */
296     protected void flush() {
297         final long start = System.nanoTime();
298         final long deadline = start + maxWorkTime;
299
300         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
301
302         long messages = 0;
303         for (;; ++messages) {
304             if (!parent.getChannel().isWritable()) {
305                 LOG.trace("Channel is no longer writable");
306                 break;
307             }
308
309             final OfHeader message = flushEntry(start);
310             if (message == null) {
311                 LOG.trace("The queue is completely drained");
312                 break;
313             }
314
315             final Object wrapper;
316             if (address == null) {
317                 wrapper = new MessageListenerWrapper(message, null);
318             } else {
319                 wrapper = new UdpMessageListenerWrapper(message, null, address);
320             }
321             parent.getChannel().write(wrapper);
322
323             /*
324              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
325              *
326              * XXX: given we already measure our flushing throughput, we
327              *      should be able to perform dynamic adjustments here.
328              *      is that additional complexity needed, though?
329              */
330             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
331                 LOG.trace("Exceeded allotted work time {}us",
332                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
333                 break;
334             }
335         }
336
337         if (messages > 0) {
338             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
339             parent.getChannel().flush();
340         }
341
342         final long stop = System.nanoTime();
343         LOG.debug("Flushed {} messages in {}us to channel {}",
344                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
345
346         /*
347          * We are almost ready to terminate. This is a bit tricky, because
348          * we do not want to have a race window where a message would be
349          * stuck on the queue without a flush being scheduled.
350          *
351          * So we mark ourselves as not running and then re-check if a
352          * flush out is needed. That will re-synchronized with other threads
353          * such that only one flush is scheduled at any given time.
354          */
355         if (!flushScheduled.compareAndSet(true, false)) {
356             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
357         }
358
359         conditionalFlush();
360     }
361
362
363     /**
364      * Schedule a queue flush if it is not empty and the channel is found
365      * to be writable. May only be called from Netty context.
366      */
367     private void conditionalFlush() {
368         if (!currentQueue.isEmpty()) {
369             scheduleFlush();
370         } else {
371             LOG.trace("Queue is empty, no flush needed");
372         }
373     }
374
375     private void conditionalFlush(final ChannelHandlerContext ctx) {
376         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
377         conditionalFlush();
378     }
379
380     @Override
381     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
382         super.channelActive(ctx);
383         conditionalFlush(ctx);
384     }
385
386     @Override
387     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
388         super.channelWritabilityChanged(ctx);
389         conditionalFlush(ctx);
390     }
391
392     @Override
393     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
394         super.channelInactive(ctx);
395
396         long entries = 0;
397         LOG.debug("Channel shutdown, flushing queue...");
398         handler.onConnectionQueueChanged(null);
399
400         final Throwable cause = new RejectedExecutionException("Channel disconnected");
401         for (OutboundQueueImpl queue : activeQueues) {
402             entries += queue.failAll(cause);
403         }
404         activeQueues.clear();
405
406         LOG.debug("Flushed {} queue entries", entries);
407     }
408
409     @Override
410     public String toString() {
411         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
412     }
413 }