BUG-3219: Introduce OutboundQueueHandler and related interfaces
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Verify;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import java.net.InetSocketAddress;
15 import java.util.ArrayDeque;
16 import java.util.Iterator;
17 import java.util.LinkedList;
18 import java.util.Queue;
19 import java.util.concurrent.RejectedExecutionException;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
22 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
29     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
30
31     /**
32      * This is the default upper bound we place on the flush task running
33      * a single iteration. We relinquish control after about this amount
34      * of time.
35      */
36     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
37
38     /**
39      * We re-check the time spent flushing every this many messages. We do this because
40      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
41      * or similar to disable the feature.
42      */
43     private static final int WORKTIME_RECHECK_MSGS = 64;
44
45     /**
46      * We maintain a cache of this many previous queues for later reuse.
47      */
48     private static final int QUEUE_CACHE_SIZE = 4;
49
50     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
51     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
52     private final ConnectionAdapterImpl parent;
53     private final InetSocketAddress address;
54     private final long maxBarrierNanos;
55     private final long maxWorkTime;
56     private final int queueSize;
57     private final T handler;
58
59     /*
60      * Instead of using an AtomicBoolean object, we use these two. It saves us
61      * from allocating an extra object.
62      */
63     @SuppressWarnings("rawtypes")
64     private static final AtomicIntegerFieldUpdater<OutboundQueueManager> FLUSH_SCHEDULED_UPDATER =
65             AtomicIntegerFieldUpdater.newUpdater(OutboundQueueManager.class, "flushScheduled");
66     private volatile int flushScheduled = 0;
67
68     // Updated from netty only
69     private long lastBarrierNanos = System.nanoTime();
70     private OutboundQueueImpl currentQueue;
71     private int nonBarrierMessages;
72     private long lastXid = 0;
73
74     // Passed to executor to request triggering of flush
75     private final Runnable flushRunnable = new Runnable() {
76         @Override
77         public void run() {
78             flush();
79         }
80     };
81     private final Runnable barrierRunnable = new Runnable() {
82         @Override
83         public void run() {
84             barrier();
85         }
86     };
87
88     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
89         final int queueSize, final long maxBarrierNanos) {
90         this.parent = Preconditions.checkNotNull(parent);
91         this.handler = Preconditions.checkNotNull(handler);
92         Preconditions.checkArgument(queueSize > 0);
93         this.queueSize = queueSize;
94         Preconditions.checkArgument(maxBarrierNanos > 0);
95         this.maxBarrierNanos = maxBarrierNanos;
96         this.address = address;
97         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
98
99         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
100         createQueue();
101         scheduleBarrierTimer(lastBarrierNanos);
102     }
103
104     T getHandler() {
105         return handler;
106     }
107
108     @Override
109     public void close() {
110         handler.onConnectionQueueChanged(null);
111     }
112
113     private void retireQueue(final OutboundQueueImpl queue) {
114         if (queueCache.offer(queue)) {
115             LOG.debug("Saving queue {} for later reuse", queue);
116         } else {
117             LOG.debug("Queue {} thrown away", queue);
118         }
119     }
120
121     private void createQueue() {
122         final long baseXid = lastXid;
123         lastXid += queueSize + 1;
124
125         final OutboundQueueImpl cached = queueCache.poll();
126         final OutboundQueueImpl queue;
127         if (cached != null) {
128             queue = cached.reuse(baseXid);
129             LOG.debug("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
130         } else {
131             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
132             LOG.debug("Allocated new queue {} on channel {}", queue, parent.getChannel());
133         }
134
135         activeQueues.add(queue);
136         currentQueue = queue;
137         handler.onConnectionQueueChanged(queue);
138     }
139
140     private void scheduleBarrierTimer(final long now) {
141         long next = lastBarrierNanos + maxBarrierNanos;
142         if (next < now) {
143             LOG.debug("Attempted to schedule barrier in the past, reset maximum)");
144             next = now + maxBarrierNanos;
145         }
146
147         final long delay = next - now;
148         LOG.debug("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
149         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
150     }
151
152     private void scheduleBarrierMessage() {
153         final Long xid = currentQueue.reserveEntry(true);
154         Verify.verifyNotNull(xid);
155
156         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
157         LOG.debug("Barrier XID {} scheduled", xid);
158
159         // We can see into the future when compared to flushEntry(), as that
160         // codepath may be lagging behind on messages. Resetting the counter
161         // here ensures that flushEntry() will not attempt to issue a flush
162         // request. Note that we do not reset current time, as that should
163         // reflect when we sent the message for real.
164         nonBarrierMessages = 0;
165     }
166
167     /**
168      * Flush an entry from the queue.
169      *
170      * @param now Time reference for 'now'. We take this as an argument, as
171      *            we need a timestamp to mark barrier messages we see swinging
172      *            by. That timestamp does not need to be completely accurate,
173      *            hence we use the flush start time. Alternative would be to
174      *            measure System.nanoTime() for each barrier -- needlessly
175      *            adding overhead.
176      *
177      * @return Entry which was flushed, null if no entry is ready.
178      */
179     OfHeader flushEntry(final long now) {
180         final OfHeader message = currentQueue.flushEntry();
181         if (currentQueue.isFlushed()) {
182             LOG.debug("Queue {} is fully flushed", currentQueue);
183             createQueue();
184         }
185
186         if (message == null) {
187             return null;
188         }
189
190         if (message instanceof BarrierInput) {
191             LOG.debug("Barrier message seen, resetting counters");
192             nonBarrierMessages = 0;
193             lastBarrierNanos = now;
194         } else {
195             nonBarrierMessages++;
196             if (nonBarrierMessages >= queueSize) {
197                 LOG.debug("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
198                 scheduleBarrierMessage();
199             }
200         }
201
202         return message;
203     }
204
205     /**
206      * Invoked whenever a message comes in from the switch. Runs matching
207      * on all active queues in an attempt to complete a previous request.
208      *
209      * @param message Potential response message
210      * @return True if the message matched a previous request, false otherwise.
211      */
212     boolean onMessage(final OfHeader message) {
213         LOG.debug("Attempting to pair message {} to a request", message);
214
215         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
216         while (it.hasNext()) {
217             final OutboundQueueImpl queue = it.next();
218             final OutboundQueueEntry entry = queue.pairRequest(message);
219
220             if (entry == null) {
221                 continue;
222             }
223
224             LOG.debug("Queue {} accepted response {}", queue, message);
225
226             // This has been a barrier request, we need to flush all
227             // previous queues
228             if (entry.isBarrier() && activeQueues.size() > 1) {
229                 LOG.debug("Queue {} indicated request was a barrier", queue);
230
231                 it = activeQueues.iterator();
232                 while (it.hasNext()) {
233                     final OutboundQueueImpl q = it.next();
234
235                     // We want to complete all queues before the current one, we will
236                     // complete the current queue below
237                     if (!queue.equals(q)) {
238                         LOG.debug("Queue {} is implied finished", q);
239                         q.completeAll();
240                         it.remove();
241                         retireQueue(q);
242                     } else {
243                         break;
244                     }
245                 }
246             }
247
248             if (queue.isFinished()) {
249                 LOG.debug("Queue {} is finished", queue);
250                 it.remove();
251                 retireQueue(queue);
252             }
253
254             return true;
255         }
256
257         LOG.debug("Failed to find completion for message {}", message);
258         return false;
259     }
260
261     private void scheduleFlush() {
262         if (parent.getChannel().isWritable()) {
263             if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
264                 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
265                 parent.getChannel().eventLoop().execute(flushRunnable);
266             } else {
267                 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
268             }
269         } else {
270             LOG.trace("Channel {} is not writable, not issuing a flush", parent.getChannel());
271         }
272     }
273
274     void ensureFlushing(final OutboundQueueImpl queue) {
275         Preconditions.checkState(currentQueue.equals(queue));
276         scheduleFlush();
277     }
278
279     /**
280      * Periodic barrier check.
281      */
282     protected void barrier() {
283         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
284         if (currentQueue == null) {
285             LOG.debug("Channel shut down, not processing barrier");
286             return;
287         }
288
289         final long now = System.nanoTime();
290         final long sinceLast = now - lastBarrierNanos;
291         if (sinceLast >= maxBarrierNanos) {
292             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
293             // FIXME: we should be tracking requests/responses instead of this
294             if (nonBarrierMessages == 0) {
295                 LOG.debug("No messages written since last barrier, not issuing one");
296             } else {
297                 scheduleBarrierMessage();
298             }
299         }
300
301         scheduleBarrierTimer(now);
302     }
303
304     /**
305      * Perform a single flush operation.
306      */
307     protected void flush() {
308         final long start = System.nanoTime();
309         final long deadline = start + maxWorkTime;
310
311         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
312
313         long messages = 0;
314         for (;; ++messages) {
315             if (!parent.getChannel().isWritable()) {
316                 LOG.trace("Channel is no longer writable");
317                 break;
318             }
319
320             final OfHeader message = flushEntry(start);
321             if (message == null) {
322                 LOG.trace("The queue is completely drained");
323                 break;
324             }
325
326             final Object wrapper;
327             if (address == null) {
328                 wrapper = new MessageListenerWrapper(message, null);
329             } else {
330                 wrapper = new UdpMessageListenerWrapper(message, null, address);
331             }
332             parent.getChannel().write(wrapper);
333
334             /*
335              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
336              *
337              * XXX: given we already measure our flushing throughput, we
338              *      should be able to perform dynamic adjustments here.
339              *      is that additional complexity needed, though?
340              */
341             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
342                 LOG.trace("Exceeded allotted work time {}us",
343                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
344                 break;
345             }
346         }
347
348         if (messages > 0) {
349             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
350             parent.getChannel().flush();
351         }
352
353         final long stop = System.nanoTime();
354         LOG.debug("Flushed {} messages in {}us to channel {}",
355                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
356
357         /*
358          * We are almost ready to terminate. This is a bit tricky, because
359          * we do not want to have a race window where a message would be
360          * stuck on the queue without a flush being scheduled.
361          *
362          * So we mark ourselves as not running and then re-check if a
363          * flush out is needed. That will re-synchronized with other threads
364          * such that only one flush is scheduled at any given time.
365          */
366         if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
367             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
368         }
369
370         conditionalFlush();
371     }
372
373
374     /**
375      * Schedule a queue flush if it is not empty and the channel is found
376      * to be writable. May only be called from Netty context.
377      */
378     private void conditionalFlush() {
379         if (!currentQueue.isEmpty()) {
380             scheduleFlush();
381         } else {
382             LOG.trace("Queue is empty, no flush needed");
383         }
384     }
385
386     private void conditionalFlush(final ChannelHandlerContext ctx) {
387         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
388         conditionalFlush();
389     }
390
391     @Override
392     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
393         super.channelActive(ctx);
394         conditionalFlush(ctx);
395     }
396
397     @Override
398     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
399         super.channelWritabilityChanged(ctx);
400         conditionalFlush(ctx);
401     }
402
403     @Override
404     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
405         super.channelInactive(ctx);
406
407         long entries = 0;
408         LOG.debug("Channel shutdown, flushing queue...");
409         handler.onConnectionQueueChanged(null);
410
411         final Throwable cause = new RejectedExecutionException("Channel disconnected");
412         for (OutboundQueueImpl queue : activeQueues) {
413             entries += queue.failAll(cause);
414         }
415         activeQueues.clear();
416
417         LOG.debug("Flushed {} queue entries", entries);
418     }
419
420     @Override
421     public String toString() {
422         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled);
423     }
424 }