6e07a6dd9d064d8e8a58e99f39eb6ea0f1493989
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicBoolean;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
26     private static enum PipelineState {
27         /**
28          * Netty thread is potentially idle, no assumptions
29          * can be made about its state.
30          */
31         IDLE,
32         /**
33          * Netty thread is currently reading, once the read completes,
34          * if will flush the queue in the {@link #FLUSHING} state.
35          */
36         READING,
37         /**
38          * Netty thread is currently performing a flush on the queue.
39          * It will then transition to {@link #IDLE} state.
40          */
41         WRITING,
42     }
43
44     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
45
46     /**
47      * Default low write watermark. Channel will become writable when number of outstanding
48      * bytes dips below this value.
49      */
50     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
51
52     /**
53      * Default write high watermark. Channel will become un-writable when number of
54      * outstanding bytes hits this value.
55      */
56     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
57
58     private final AtomicBoolean flushScheduled = new AtomicBoolean();
59     private final StackedOutboundQueue currentQueue;
60     private final ConnectionAdapterImpl parent;
61     private final InetSocketAddress address;
62     private final int maxNonBarrierMessages;
63     private final long maxBarrierNanos;
64     private final T handler;
65
66     // Accessed concurrently
67     private volatile PipelineState state = PipelineState.IDLE;
68
69     // Updated from netty only
70     private boolean alreadyReading;
71     private boolean barrierTimerEnabled;
72     private long lastBarrierNanos = System.nanoTime();
73     private int nonBarrierMessages;
74     private boolean shuttingDown;
75
76     // Passed to executor to request triggering of flush
77     private final Runnable flushRunnable = new Runnable() {
78         @Override
79         public void run() {
80             flush();
81         }
82     };
83
84     // Passed to executor to request a periodic barrier check
85     private final Runnable barrierRunnable = new Runnable() {
86         @Override
87         public void run() {
88             barrier();
89         }
90     };
91
92     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
93         final int maxNonBarrierMessages, final long maxBarrierNanos) {
94         this.parent = Preconditions.checkNotNull(parent);
95         this.handler = Preconditions.checkNotNull(handler);
96         Preconditions.checkArgument(maxNonBarrierMessages > 0);
97         this.maxNonBarrierMessages = maxNonBarrierMessages;
98         Preconditions.checkArgument(maxBarrierNanos > 0);
99         this.maxBarrierNanos = maxBarrierNanos;
100         this.address = address;
101
102         currentQueue = new StackedOutboundQueue(this);
103         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
104         handler.onConnectionQueueChanged(currentQueue);
105     }
106
107     T getHandler() {
108         return handler;
109     }
110
111     @Override
112     public void close() {
113         handler.onConnectionQueueChanged(null);
114     }
115
116     private void scheduleBarrierTimer(final long now) {
117         long next = lastBarrierNanos + maxBarrierNanos;
118         if (next < now) {
119             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
120             next = now + maxBarrierNanos;
121         }
122
123         final long delay = next - now;
124         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
125         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
126         barrierTimerEnabled = true;
127     }
128
129     private void scheduleBarrierMessage() {
130         final Long xid = currentQueue.reserveBarrierIfNeeded();
131         if (xid == null) {
132             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
133             return;
134         }
135
136         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
137         LOG.trace("Barrier XID {} scheduled", xid);
138     }
139
140
141     /**
142      * Invoked whenever a message comes in from the switch. Runs matching
143      * on all active queues in an attempt to complete a previous request.
144      *
145      * @param message Potential response message
146      * @return True if the message matched a previous request, false otherwise.
147      */
148     boolean onMessage(final OfHeader message) {
149         LOG.trace("Attempting to pair message {} to a request", message);
150
151         return currentQueue.pairRequest(message);
152     }
153
154     private void scheduleFlush() {
155         if (flushScheduled.compareAndSet(false, true)) {
156             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
157             parent.getChannel().eventLoop().execute(flushRunnable);
158         } else {
159             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
160         }
161     }
162
163     /**
164      * Periodic barrier check.
165      */
166     protected void barrier() {
167         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
168         barrierTimerEnabled = false;
169         if (shuttingDown) {
170             LOG.trace("Channel shut down, not processing barrier");
171             return;
172         }
173
174         final long now = System.nanoTime();
175         final long sinceLast = now - lastBarrierNanos;
176         if (sinceLast >= maxBarrierNanos) {
177             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
178             // FIXME: we should be tracking requests/responses instead of this
179             if (nonBarrierMessages == 0) {
180                 LOG.trace("No messages written since last barrier, not issuing one");
181             } else {
182                 scheduleBarrierMessage();
183             }
184         }
185     }
186
187     private void rescheduleFlush() {
188         /*
189          * We are almost ready to terminate. This is a bit tricky, because
190          * we do not want to have a race window where a message would be
191          * stuck on the queue without a flush being scheduled.
192          *
193          * So we mark ourselves as not running and then re-check if a
194          * flush out is needed. That will re-synchronized with other threads
195          * such that only one flush is scheduled at any given time.
196          */
197         if (!flushScheduled.compareAndSet(true, false)) {
198             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
199         }
200
201         conditionalFlush();
202     }
203
204     private void writeAndFlush() {
205         state = PipelineState.WRITING;
206
207         final long start = System.nanoTime();
208
209         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
210         if (entries > 0) {
211             LOG.trace("Flushing channel {}", parent.getChannel());
212             parent.getChannel().flush();
213         }
214
215         if (LOG.isDebugEnabled()) {
216             final long stop = System.nanoTime();
217             LOG.debug("Flushed {} messages to channel {} in {}us", entries,
218                 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
219         }
220
221         state = PipelineState.IDLE;
222     }
223
224     /**
225      * Perform a single flush operation. We keep it here so we do not generate
226      * syntetic accessors for private fields. Otherwise it could be moved into
227      * {@link #flushRunnable}.
228      */
229     protected void flush() {
230         // If the channel is gone, just flush whatever is not completed
231         if (!shuttingDown) {
232             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
233             writeAndFlush();
234             rescheduleFlush();
235         } else if (currentQueue.finishShutdown()) {
236             handler.onConnectionQueueChanged(null);
237             LOG.debug("Channel {} shutdown complete", parent.getChannel());
238         } else {
239             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
240             rescheduleFlush();
241         }
242     }
243
244     /**
245      * Schedule a queue flush if it is not empty and the channel is found
246      * to be writable. May only be called from Netty context.
247      */
248     private void conditionalFlush() {
249         if (currentQueue.needsFlush()) {
250             if (shuttingDown || parent.getChannel().isWritable()) {
251                 scheduleFlush();
252             } else {
253                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
254             }
255         } else {
256             LOG.trace("Queue is empty, no flush needed");
257         }
258     }
259
260     @Override
261     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
262         super.channelActive(ctx);
263         conditionalFlush();
264     }
265
266     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
267         /*
268          * Tune channel write buffering. We increase the writability window
269          * to ensure we can flush an entire queue segment in one go. We definitely
270          * want to keep the difference above 64k, as that will ensure we use jam-packed
271          * TCP packets. UDP will fragment as appropriate.
272          */
273         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
274         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
275
276         super.handlerAdded(ctx);
277     }
278
279     @Override
280     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
281         super.channelWritabilityChanged(ctx);
282
283         // The channel is writable again. There may be a flush task on the way, but let's
284         // steal its work, potentially decreasing latency. Since there is a window between
285         // now and when it will run, it may still pick up some more work to do.
286         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
287         writeAndFlush();
288     }
289
290     @Override
291     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
292         super.channelInactive(ctx);
293
294         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
295
296         shuttingDown = true;
297         final long entries = currentQueue.startShutdown(ctx.channel());
298         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
299
300         scheduleFlush();
301     }
302
303     @Override
304     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
305         // Netty does not provide a 'start reading' callback, so this is our first
306         // (and repeated) chance to detect reading. Since this callback can be invoked
307         // multiple times, we keep a boolean we check. That prevents a volatile write
308         // on repeated invocations. It will be cleared in channelReadComplete().
309         if (!alreadyReading) {
310             alreadyReading = true;
311             state = PipelineState.READING;
312         }
313         super.channelRead(ctx, msg);
314     }
315
316     @Override
317     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
318         super.channelReadComplete(ctx);
319
320         // Run flush regardless of writability. This is not strictly required, as
321         // there may be a scheduled flush. Instead of canceling it, which is expensive,
322         // we'll steal its work. Note that more work may accumulate in the time window
323         // between now and when the task will run, so it may not be a no-op after all.
324         //
325         // The reason for this is to will the output buffer before we go into selection
326         // phase. This will make sure the pipe is full (in which case our next wake up
327         // will be the queue becoming writable).
328         writeAndFlush();
329     }
330
331     @Override
332     public String toString() {
333         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
334     }
335
336     void ensureFlushing() {
337         // If the channel is not writable, there's no point in waking up,
338         // once we become writable, we will run a full flush
339         if (!parent.getChannel().isWritable()) {
340             return;
341         }
342
343         // We are currently reading something, just a quick sync to ensure we will in fact
344         // flush state.
345         final PipelineState localState = state;
346         LOG.debug("Synchronize on pipeline state {}", localState);
347         switch (localState) {
348         case READING:
349             // Netty thread is currently reading, it will flush the pipeline once it
350             // finishes reading. This is a no-op situation.
351             break;
352         case WRITING:
353         case IDLE:
354         default:
355             // We cannot rely on the change being flushed, schedule a request
356             scheduleFlush();
357         }
358     }
359
360     void onEchoRequest(final EchoRequestMessage message) {
361         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
362         parent.getChannel().writeAndFlush(reply);
363     }
364
365     /**
366      * Write a message into the underlying channel.
367      *
368      * @param now Time reference for 'now'. We take this as an argument, as
369      *            we need a timestamp to mark barrier messages we see swinging
370      *            by. That timestamp does not need to be completely accurate,
371      *            hence we use the flush start time. Alternative would be to
372      *            measure System.nanoTime() for each barrier -- needlessly
373      *            adding overhead.
374      */
375     void writeMessage(final OfHeader message, final long now) {
376         final Object wrapper;
377         if (address == null) {
378             wrapper = new MessageListenerWrapper(message, null);
379         } else {
380             wrapper = new UdpMessageListenerWrapper(message, null, address);
381         }
382         parent.getChannel().write(wrapper);
383
384         if (message instanceof BarrierInput) {
385             LOG.trace("Barrier message seen, resetting counters");
386             nonBarrierMessages = 0;
387             lastBarrierNanos = now;
388         } else {
389             nonBarrierMessages++;
390             if (nonBarrierMessages >= maxNonBarrierMessages) {
391                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
392                 scheduleBarrierMessage();
393             } else if (!barrierTimerEnabled) {
394                 scheduleBarrierTimer(now);
395             }
396         }
397     }
398 }