Merge "Barrier turn on/off - Split ConnectionAdapter functionality" into stable/lithium
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import io.netty.util.concurrent.Future;
14 import io.netty.util.concurrent.GenericFutureListener;
15 import java.net.InetSocketAddress;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
29     private static enum PipelineState {
30         /**
31          * Netty thread is potentially idle, no assumptions
32          * can be made about its state.
33          */
34         IDLE,
35         /**
36          * Netty thread is currently reading, once the read completes,
37          * if will flush the queue in the {@link #FLUSHING} state.
38          */
39         READING,
40         /**
41          * Netty thread is currently performing a flush on the queue.
42          * It will then transition to {@link #IDLE} state.
43          */
44         WRITING,
45     }
46
47     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
48
49     /**
50      * Default low write watermark. Channel will become writable when number of outstanding
51      * bytes dips below this value.
52      */
53     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
54
55     /**
56      * Default write high watermark. Channel will become un-writable when number of
57      * outstanding bytes hits this value.
58      */
59     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
60
61     private final AtomicBoolean flushScheduled = new AtomicBoolean();
62     private final StackedOutboundQueue currentQueue;
63     private final ConnectionAdapterImpl parent;
64     private final InetSocketAddress address;
65     private final int maxNonBarrierMessages;
66     private final long maxBarrierNanos;
67     private final T handler;
68
69     // Accessed concurrently
70     private volatile PipelineState state = PipelineState.IDLE;
71
72     // Updated from netty only
73     private boolean alreadyReading;
74     private boolean barrierTimerEnabled;
75     private long lastBarrierNanos = System.nanoTime();
76     private int nonBarrierMessages;
77     private boolean shuttingDown;
78
79     // Passed to executor to request triggering of flush
80     private final Runnable flushRunnable = new Runnable() {
81         @Override
82         public void run() {
83             flush();
84         }
85     };
86
87     // Passed to executor to request a periodic barrier check
88     private final Runnable barrierRunnable = new Runnable() {
89         @Override
90         public void run() {
91             barrier();
92         }
93     };
94
95     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
96         final int maxNonBarrierMessages, final long maxBarrierNanos) {
97         this.parent = Preconditions.checkNotNull(parent);
98         this.handler = Preconditions.checkNotNull(handler);
99         Preconditions.checkArgument(maxNonBarrierMessages > 0);
100         this.maxNonBarrierMessages = maxNonBarrierMessages;
101         Preconditions.checkArgument(maxBarrierNanos > 0);
102         this.maxBarrierNanos = maxBarrierNanos;
103         this.address = address;
104
105         currentQueue = new StackedOutboundQueue(this);
106         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
107         handler.onConnectionQueueChanged(currentQueue);
108     }
109
110     T getHandler() {
111         return handler;
112     }
113
114     @Override
115     public void close() {
116         handler.onConnectionQueueChanged(null);
117     }
118
119     private void scheduleBarrierTimer(final long now) {
120         long next = lastBarrierNanos + maxBarrierNanos;
121         if (next < now) {
122             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
123             next = now + maxBarrierNanos;
124         }
125
126         final long delay = next - now;
127         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
128         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
129         barrierTimerEnabled = true;
130     }
131
132     private void scheduleBarrierMessage() {
133         final Long xid = currentQueue.reserveBarrierIfNeeded();
134         if (xid == null) {
135             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
136             return;
137         }
138
139         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
140         LOG.trace("Barrier XID {} scheduled", xid);
141     }
142
143
144     /**
145      * Invoked whenever a message comes in from the switch. Runs matching
146      * on all active queues in an attempt to complete a previous request.
147      *
148      * @param message Potential response message
149      * @return True if the message matched a previous request, false otherwise.
150      */
151     boolean onMessage(final OfHeader message) {
152         LOG.trace("Attempting to pair message {} to a request", message);
153
154         return currentQueue.pairRequest(message);
155     }
156
157     private void scheduleFlush() {
158         if (flushScheduled.compareAndSet(false, true)) {
159             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
160             parent.getChannel().eventLoop().execute(flushRunnable);
161         } else {
162             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
163         }
164     }
165
166     /**
167      * Periodic barrier check.
168      */
169     protected void barrier() {
170         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
171         barrierTimerEnabled = false;
172         if (shuttingDown) {
173             LOG.trace("Channel shut down, not processing barrier");
174             return;
175         }
176
177         final long now = System.nanoTime();
178         final long sinceLast = now - lastBarrierNanos;
179         if (sinceLast >= maxBarrierNanos) {
180             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
181             // FIXME: we should be tracking requests/responses instead of this
182             if (nonBarrierMessages == 0) {
183                 LOG.trace("No messages written since last barrier, not issuing one");
184             } else {
185                 scheduleBarrierMessage();
186             }
187         }
188     }
189
190     private void rescheduleFlush() {
191         /*
192          * We are almost ready to terminate. This is a bit tricky, because
193          * we do not want to have a race window where a message would be
194          * stuck on the queue without a flush being scheduled.
195          *
196          * So we mark ourselves as not running and then re-check if a
197          * flush out is needed. That will re-synchronized with other threads
198          * such that only one flush is scheduled at any given time.
199          */
200         if (!flushScheduled.compareAndSet(true, false)) {
201             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
202         }
203
204         conditionalFlush();
205     }
206
207     private void writeAndFlush() {
208         state = PipelineState.WRITING;
209
210         final long start = System.nanoTime();
211
212         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
213         if (entries > 0) {
214             LOG.trace("Flushing channel {}", parent.getChannel());
215             parent.getChannel().flush();
216         }
217
218         if (LOG.isDebugEnabled()) {
219             final long stop = System.nanoTime();
220             LOG.debug("Flushed {} messages to channel {} in {}us", entries,
221                 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
222         }
223
224         state = PipelineState.IDLE;
225     }
226
227     /**
228      * Perform a single flush operation. We keep it here so we do not generate
229      * syntetic accessors for private fields. Otherwise it could be moved into
230      * {@link #flushRunnable}.
231      */
232     protected void flush() {
233         // If the channel is gone, just flush whatever is not completed
234         if (!shuttingDown) {
235             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
236             writeAndFlush();
237             rescheduleFlush();
238         } else if (currentQueue.finishShutdown()) {
239             handler.onConnectionQueueChanged(null);
240             LOG.debug("Channel {} shutdown complete", parent.getChannel());
241         } else {
242             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
243             rescheduleFlush();
244         }
245     }
246
247     /**
248      * Schedule a queue flush if it is not empty and the channel is found
249      * to be writable. May only be called from Netty context.
250      */
251     private void conditionalFlush() {
252         if (currentQueue.needsFlush()) {
253             if (shuttingDown || parent.getChannel().isWritable()) {
254                 scheduleFlush();
255             } else {
256                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
257             }
258         } else {
259             LOG.trace("Queue is empty, no flush needed");
260         }
261     }
262
263     @Override
264     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
265         super.channelActive(ctx);
266         conditionalFlush();
267     }
268
269     @Override
270     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
271         /*
272          * Tune channel write buffering. We increase the writability window
273          * to ensure we can flush an entire queue segment in one go. We definitely
274          * want to keep the difference above 64k, as that will ensure we use jam-packed
275          * TCP packets. UDP will fragment as appropriate.
276          */
277         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
278         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
279
280         super.handlerAdded(ctx);
281     }
282
283     @Override
284     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
285         super.channelWritabilityChanged(ctx);
286
287         // The channel is writable again. There may be a flush task on the way, but let's
288         // steal its work, potentially decreasing latency. Since there is a window between
289         // now and when it will run, it may still pick up some more work to do.
290         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
291         writeAndFlush();
292     }
293
294     @Override
295     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
296         super.channelInactive(ctx);
297
298         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
299
300         shuttingDown = true;
301         final long entries = currentQueue.startShutdown(ctx.channel());
302         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
303
304         scheduleFlush();
305     }
306
307     @Override
308     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
309         // Netty does not provide a 'start reading' callback, so this is our first
310         // (and repeated) chance to detect reading. Since this callback can be invoked
311         // multiple times, we keep a boolean we check. That prevents a volatile write
312         // on repeated invocations. It will be cleared in channelReadComplete().
313         if (!alreadyReading) {
314             alreadyReading = true;
315             state = PipelineState.READING;
316         }
317         super.channelRead(ctx, msg);
318     }
319
320     @Override
321     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
322         super.channelReadComplete(ctx);
323
324         // Run flush regardless of writability. This is not strictly required, as
325         // there may be a scheduled flush. Instead of canceling it, which is expensive,
326         // we'll steal its work. Note that more work may accumulate in the time window
327         // between now and when the task will run, so it may not be a no-op after all.
328         //
329         // The reason for this is to will the output buffer before we go into selection
330         // phase. This will make sure the pipe is full (in which case our next wake up
331         // will be the queue becoming writable).
332         writeAndFlush();
333     }
334
335     @Override
336     public String toString() {
337         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
338     }
339
340     void ensureFlushing() {
341         // If the channel is not writable, there's no point in waking up,
342         // once we become writable, we will run a full flush
343         if (!parent.getChannel().isWritable()) {
344             return;
345         }
346
347         // We are currently reading something, just a quick sync to ensure we will in fact
348         // flush state.
349         final PipelineState localState = state;
350         LOG.debug("Synchronize on pipeline state {}", localState);
351         switch (localState) {
352         case READING:
353             // Netty thread is currently reading, it will flush the pipeline once it
354             // finishes reading. This is a no-op situation.
355             break;
356         case WRITING:
357         case IDLE:
358         default:
359             // We cannot rely on the change being flushed, schedule a request
360             scheduleFlush();
361         }
362     }
363
364     void onEchoRequest(final EchoRequestMessage message) {
365         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
366                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
367         parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
368     }
369
370     /**
371      * Write a message into the underlying channel.
372      *
373      * @param now Time reference for 'now'. We take this as an argument, as
374      *            we need a timestamp to mark barrier messages we see swinging
375      *            by. That timestamp does not need to be completely accurate,
376      *            hence we use the flush start time. Alternative would be to
377      *            measure System.nanoTime() for each barrier -- needlessly
378      *            adding overhead.
379      */
380     void writeMessage(final OfHeader message, final long now) {
381         final Object wrapper = makeMessageListenerWrapper(message);
382         parent.getChannel().write(wrapper);
383
384         if (message instanceof BarrierInput) {
385             LOG.trace("Barrier message seen, resetting counters");
386             nonBarrierMessages = 0;
387             lastBarrierNanos = now;
388         } else {
389             nonBarrierMessages++;
390             if (nonBarrierMessages >= maxNonBarrierMessages) {
391                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
392                 scheduleBarrierMessage();
393             } else if (!barrierTimerEnabled) {
394                 scheduleBarrierTimer(now);
395             }
396         }
397     }
398
399     /**
400      * Wraps outgoing message and includes listener attached to this message
401      * which is send to OFEncoder for serialization. Correct wrapper is
402      * selected by communication pipeline.
403      *
404      * @return
405      */
406     private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
407         Preconditions.checkArgument(msg != null);
408
409         if (address == null) {
410             return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
411         }
412         return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
413     }
414
415     /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
416     private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
417
418         private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
419
420         @Override
421         public void operationComplete(final Future<Void> future) throws Exception {
422             if (future.cause() != null) {
423                 LOGGER.warn("Message encoding fail !", future.cause());
424             }
425         }
426     };
427 }