BUG-3219: Fix OutboundQueue cleanup on channel failure
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayDeque;
15 import java.util.Iterator;
16 import java.util.LinkedList;
17 import java.util.Queue;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicBoolean;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueException;
21 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
28     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
29
30     /**
31      * This is the default upper bound we place on the flush task running
32      * a single iteration. We relinquish control after about this amount
33      * of time.
34      */
35     private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
36
37     /**
38      * We re-check the time spent flushing every this many messages. We do this because
39      * checking after each message may prove to be CPU-intensive. Set to Integer.MAX_VALUE
40      * or similar to disable the feature.
41      */
42     private static final int WORKTIME_RECHECK_MSGS = 64;
43
44     /**
45      * We maintain a cache of this many previous queues for later reuse.
46      */
47     private static final int QUEUE_CACHE_SIZE = 4;
48
49     private final Queue<OutboundQueueImpl> queueCache = new ArrayDeque<>(QUEUE_CACHE_SIZE);
50     private final Queue<OutboundQueueImpl> activeQueues = new LinkedList<>();
51     private final AtomicBoolean flushScheduled = new AtomicBoolean();
52     private final ConnectionAdapterImpl parent;
53     private final InetSocketAddress address;
54     private final long maxBarrierNanos;
55     private final long maxWorkTime;
56     private final int queueSize;
57     private final T handler;
58
59     // Updated from netty only
60     private long lastBarrierNanos = System.nanoTime();
61     private OutboundQueueImpl currentQueue;
62     private boolean barrierTimerEnabled;
63     private int nonBarrierMessages;
64     private long lastXid = 0;
65
66     // Passed to executor to request triggering of flush
67     private final Runnable flushRunnable = new Runnable() {
68         @Override
69         public void run() {
70             flush();
71         }
72     };
73
74     // Passed to executor to request a periodic barrier check
75     private final Runnable barrierRunnable = new Runnable() {
76         @Override
77         public void run() {
78             barrier();
79         }
80     };
81
82     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
83         final int queueSize, final long maxBarrierNanos) {
84         this.parent = Preconditions.checkNotNull(parent);
85         this.handler = Preconditions.checkNotNull(handler);
86         Preconditions.checkArgument(queueSize > 0);
87         this.queueSize = queueSize;
88         Preconditions.checkArgument(maxBarrierNanos > 0);
89         this.maxBarrierNanos = maxBarrierNanos;
90         this.address = address;
91         this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
92
93         LOG.debug("Queue manager instantiated with queue size {}", queueSize);
94         createQueue();
95     }
96
97     T getHandler() {
98         return handler;
99     }
100
101     @Override
102     public void close() {
103         handler.onConnectionQueueChanged(null);
104     }
105
106     private void retireQueue(final OutboundQueueImpl queue) {
107         if (queueCache.offer(queue)) {
108             LOG.trace("Saving queue {} for later reuse", queue);
109         } else {
110             LOG.trace("Queue {} thrown away", queue);
111         }
112     }
113
114     private void createQueue() {
115         final long baseXid = lastXid;
116         lastXid += queueSize + 1;
117
118         final OutboundQueueImpl cached = queueCache.poll();
119         final OutboundQueueImpl queue;
120         if (cached != null) {
121             queue = cached.reuse(baseXid);
122             LOG.trace("Reusing queue {} as {} on channel {}", cached, queue, parent.getChannel());
123         } else {
124             queue = new OutboundQueueImpl(this, baseXid, queueSize + 1);
125             LOG.trace("Allocated new queue {} on channel {}", queue, parent.getChannel());
126         }
127
128         activeQueues.add(queue);
129         currentQueue = queue;
130         handler.onConnectionQueueChanged(queue);
131     }
132
133     private void scheduleBarrierTimer(final long now) {
134         long next = lastBarrierNanos + maxBarrierNanos;
135         if (next < now) {
136             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
137             next = now + maxBarrierNanos;
138         }
139
140         final long delay = next - now;
141         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
142         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
143         barrierTimerEnabled = true;
144     }
145
146     private void scheduleBarrierMessage() {
147         final Long xid = currentQueue.reserveBarrierIfNeeded();
148         if (xid == null) {
149             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
150             return;
151         }
152
153         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
154         LOG.trace("Barrier XID {} scheduled", xid);
155     }
156
157     /**
158      * Flush an entry from the queue.
159      *
160      * @param now Time reference for 'now'. We take this as an argument, as
161      *            we need a timestamp to mark barrier messages we see swinging
162      *            by. That timestamp does not need to be completely accurate,
163      *            hence we use the flush start time. Alternative would be to
164      *            measure System.nanoTime() for each barrier -- needlessly
165      *            adding overhead.
166      *
167      * @return Entry which was flushed, null if no entry is ready.
168      */
169     OfHeader flushEntry(final long now) {
170         final OfHeader message = currentQueue.flushEntry();
171         if (currentQueue.isFlushed()) {
172             LOG.debug("Queue {} is fully flushed", currentQueue);
173             createQueue();
174         }
175
176         if (message == null) {
177             return null;
178         }
179
180         if (message instanceof BarrierInput) {
181             LOG.trace("Barrier message seen, resetting counters");
182             nonBarrierMessages = 0;
183             lastBarrierNanos = now;
184         } else {
185             nonBarrierMessages++;
186             if (nonBarrierMessages >= queueSize) {
187                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
188                 scheduleBarrierMessage();
189             } else if (!barrierTimerEnabled) {
190                 scheduleBarrierTimer(now);
191             }
192         }
193
194         return message;
195     }
196
197     /**
198      * Invoked whenever a message comes in from the switch. Runs matching
199      * on all active queues in an attempt to complete a previous request.
200      *
201      * @param message Potential response message
202      * @return True if the message matched a previous request, false otherwise.
203      */
204     boolean onMessage(final OfHeader message) {
205         LOG.trace("Attempting to pair message {} to a request", message);
206
207         Iterator<OutboundQueueImpl> it = activeQueues.iterator();
208         while (it.hasNext()) {
209             final OutboundQueueImpl queue = it.next();
210             final OutboundQueueEntry entry = queue.pairRequest(message);
211
212             if (entry == null) {
213                 continue;
214             }
215
216             LOG.trace("Queue {} accepted response {}", queue, message);
217
218             // This has been a barrier request, we need to flush all
219             // previous queues
220             if (entry.isBarrier() && activeQueues.size() > 1) {
221                 LOG.trace("Queue {} indicated request was a barrier", queue);
222
223                 it = activeQueues.iterator();
224                 while (it.hasNext()) {
225                     final OutboundQueueImpl q = it.next();
226
227                     // We want to complete all queues before the current one, we will
228                     // complete the current queue below
229                     if (!queue.equals(q)) {
230                         LOG.trace("Queue {} is implied finished", q);
231                         q.completeAll();
232                         it.remove();
233                         retireQueue(q);
234                     } else {
235                         break;
236                     }
237                 }
238             }
239
240             if (queue.isFinished()) {
241                 LOG.trace("Queue {} is finished", queue);
242                 it.remove();
243                 retireQueue(queue);
244             }
245
246             return true;
247         }
248
249         LOG.debug("Failed to find completion for message {}", message);
250         return false;
251     }
252
253     private void scheduleFlush() {
254         if (flushScheduled.compareAndSet(false, true)) {
255             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
256             parent.getChannel().eventLoop().execute(flushRunnable);
257         } else {
258             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
259         }
260     }
261
262     void ensureFlushing(final OutboundQueueImpl queue) {
263         Preconditions.checkState(currentQueue.equals(queue));
264         scheduleFlush();
265     }
266
267     /**
268      * Periodic barrier check.
269      */
270     protected void barrier() {
271         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
272         barrierTimerEnabled = false;
273         if (currentQueue == null) {
274             LOG.trace("Channel shut down, not processing barrier");
275             return;
276         }
277
278         final long now = System.nanoTime();
279         final long sinceLast = now - lastBarrierNanos;
280         if (sinceLast >= maxBarrierNanos) {
281             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
282             // FIXME: we should be tracking requests/responses instead of this
283             if (nonBarrierMessages == 0) {
284                 LOG.trace("No messages written since last barrier, not issuing one");
285             } else {
286                 scheduleBarrierMessage();
287             }
288         }
289     }
290
291     /**
292      * Perform a single flush operation.
293      */
294     protected void flush() {
295         // If the channel is gone, just flush whatever is not completed
296         if (currentQueue == null) {
297             long entries = 0;
298
299             final Iterator<OutboundQueueImpl> it = activeQueues.iterator();
300             while (it.hasNext()) {
301                 final OutboundQueueImpl queue = it.next();
302                 entries += queue.failAll(OutboundQueueException.DEVICE_DISCONNECTED);
303                 if (queue.isFinished()) {
304                     LOG.trace("Cleared queue {}", queue);
305                     it.remove();
306                 }
307             }
308
309             LOG.debug("Cleared {} queue entries from channel {}", entries, parent.getChannel());
310             return;
311         }
312
313         final long start = System.nanoTime();
314         final long deadline = start + maxWorkTime;
315
316         LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
317
318         long messages = 0;
319         for (;; ++messages) {
320             if (!parent.getChannel().isWritable()) {
321                 LOG.trace("Channel is no longer writable");
322                 break;
323             }
324
325             final OfHeader message = flushEntry(start);
326             if (message == null) {
327                 LOG.trace("The queue is completely drained");
328                 break;
329             }
330
331             final Object wrapper;
332             if (address == null) {
333                 wrapper = new MessageListenerWrapper(message, null);
334             } else {
335                 wrapper = new UdpMessageListenerWrapper(message, null, address);
336             }
337             parent.getChannel().write(wrapper);
338
339             /*
340              * Check every WORKTIME_RECHECK_MSGS for exceeded time.
341              *
342              * XXX: given we already measure our flushing throughput, we
343              *      should be able to perform dynamic adjustments here.
344              *      is that additional complexity needed, though?
345              */
346             if ((messages % WORKTIME_RECHECK_MSGS) == 0 && System.nanoTime() >= deadline) {
347                 LOG.trace("Exceeded allotted work time {}us",
348                         TimeUnit.NANOSECONDS.toMicros(maxWorkTime));
349                 break;
350             }
351         }
352
353         if (messages > 0) {
354             LOG.debug("Flushing {} message(s) to channel {}", messages, parent.getChannel());
355             parent.getChannel().flush();
356         }
357
358         final long stop = System.nanoTime();
359         LOG.debug("Flushed {} messages in {}us to channel {}",
360                 messages, TimeUnit.NANOSECONDS.toMicros(stop - start), parent.getChannel());
361
362         /*
363          * We are almost ready to terminate. This is a bit tricky, because
364          * we do not want to have a race window where a message would be
365          * stuck on the queue without a flush being scheduled.
366          *
367          * So we mark ourselves as not running and then re-check if a
368          * flush out is needed. That will re-synchronized with other threads
369          * such that only one flush is scheduled at any given time.
370          */
371         if (!flushScheduled.compareAndSet(true, false)) {
372             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
373         }
374
375         conditionalFlush();
376     }
377
378     /**
379      * Schedule a queue flush if it is not empty and the channel is found
380      * to be writable. May only be called from Netty context.
381      */
382     private void conditionalFlush() {
383         if (!currentQueue.isEmpty()) {
384             scheduleFlush();
385         } else {
386             LOG.trace("Queue is empty, no flush needed");
387         }
388     }
389
390     private void conditionalFlush(final ChannelHandlerContext ctx) {
391         Preconditions.checkState(ctx.channel().equals(parent.getChannel()), "Inconsistent channel %s with context %s", parent.getChannel(), ctx);
392         conditionalFlush();
393     }
394
395     @Override
396     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
397         super.channelActive(ctx);
398         conditionalFlush(ctx);
399     }
400
401     @Override
402     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
403         super.channelWritabilityChanged(ctx);
404         conditionalFlush(ctx);
405     }
406
407     @Override
408     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
409         super.channelInactive(ctx);
410
411         LOG.debug("Channel {} shutdown, flushing queue...", parent.getChannel());
412         handler.onConnectionQueueChanged(null);
413         currentQueue = null;
414         queueCache.clear();
415
416         scheduleFlush();
417     }
418
419     @Override
420     public String toString() {
421         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
422     }
423 }