Implement a segmented OutboundQueue
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicBoolean;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
26     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
27
28     /**
29      * Default low write watermark. Channel will become writable when number of outstanding
30      * bytes dips below this value.
31      */
32     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
33
34     /**
35      * Default write high watermark. Channel will become un-writable when number of
36      * outstanding bytes hits this value.
37      */
38     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
39
40     private final AtomicBoolean flushScheduled = new AtomicBoolean();
41     private final StackedOutboundQueue currentQueue;
42     private final ConnectionAdapterImpl parent;
43     private final InetSocketAddress address;
44     private final int maxNonBarrierMessages;
45     private final long maxBarrierNanos;
46     private final T handler;
47
48     // Accessed concurrently
49     private volatile boolean reading;
50
51     // Updated from netty only
52     private boolean alreadyReading;
53     private boolean barrierTimerEnabled;
54     private long lastBarrierNanos = System.nanoTime();
55     private int nonBarrierMessages;
56     private boolean shuttingDown;
57
58     // Passed to executor to request triggering of flush
59     private final Runnable flushRunnable = new Runnable() {
60         @Override
61         public void run() {
62             flush();
63         }
64     };
65
66     // Passed to executor to request a periodic barrier check
67     private final Runnable barrierRunnable = new Runnable() {
68         @Override
69         public void run() {
70             barrier();
71         }
72     };
73
74     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
75         final int maxNonBarrierMessages, final long maxBarrierNanos) {
76         this.parent = Preconditions.checkNotNull(parent);
77         this.handler = Preconditions.checkNotNull(handler);
78         Preconditions.checkArgument(maxNonBarrierMessages > 0);
79         this.maxNonBarrierMessages = maxNonBarrierMessages;
80         Preconditions.checkArgument(maxBarrierNanos > 0);
81         this.maxBarrierNanos = maxBarrierNanos;
82         this.address = address;
83
84         currentQueue = new StackedOutboundQueue(this);
85         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
86         handler.onConnectionQueueChanged(currentQueue);
87     }
88
89     T getHandler() {
90         return handler;
91     }
92
93     @Override
94     public void close() {
95         handler.onConnectionQueueChanged(null);
96     }
97
98     private void scheduleBarrierTimer(final long now) {
99         long next = lastBarrierNanos + maxBarrierNanos;
100         if (next < now) {
101             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
102             next = now + maxBarrierNanos;
103         }
104
105         final long delay = next - now;
106         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
107         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
108         barrierTimerEnabled = true;
109     }
110
111     private void scheduleBarrierMessage() {
112         final Long xid = currentQueue.reserveBarrierIfNeeded();
113         if (xid == null) {
114             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
115             return;
116         }
117
118         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
119         LOG.trace("Barrier XID {} scheduled", xid);
120     }
121
122
123     /**
124      * Invoked whenever a message comes in from the switch. Runs matching
125      * on all active queues in an attempt to complete a previous request.
126      *
127      * @param message Potential response message
128      * @return True if the message matched a previous request, false otherwise.
129      */
130     boolean onMessage(final OfHeader message) {
131         LOG.trace("Attempting to pair message {} to a request", message);
132
133         return currentQueue.pairRequest(message);
134     }
135
136     private void scheduleFlush() {
137         if (flushScheduled.compareAndSet(false, true)) {
138             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
139             parent.getChannel().eventLoop().execute(flushRunnable);
140         } else {
141             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
142         }
143     }
144
145     /**
146      * Periodic barrier check.
147      */
148     protected void barrier() {
149         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
150         barrierTimerEnabled = false;
151         if (shuttingDown) {
152             LOG.trace("Channel shut down, not processing barrier");
153             return;
154         }
155
156         final long now = System.nanoTime();
157         final long sinceLast = now - lastBarrierNanos;
158         if (sinceLast >= maxBarrierNanos) {
159             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
160             // FIXME: we should be tracking requests/responses instead of this
161             if (nonBarrierMessages == 0) {
162                 LOG.trace("No messages written since last barrier, not issuing one");
163             } else {
164                 scheduleBarrierMessage();
165             }
166         }
167     }
168
169     private void rescheduleFlush() {
170         /*
171          * We are almost ready to terminate. This is a bit tricky, because
172          * we do not want to have a race window where a message would be
173          * stuck on the queue without a flush being scheduled.
174          *
175          * So we mark ourselves as not running and then re-check if a
176          * flush out is needed. That will re-synchronized with other threads
177          * such that only one flush is scheduled at any given time.
178          */
179         if (!flushScheduled.compareAndSet(true, false)) {
180             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
181         }
182
183         conditionalFlush();
184     }
185
186     private void writeAndFlush() {
187         final long start = System.nanoTime();
188
189         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
190         if (entries > 0) {
191             LOG.debug("Flushing channel {}", parent.getChannel());
192             parent.getChannel().flush();
193         }
194
195         if (LOG.isDebugEnabled()) {
196             final long stop = System.nanoTime();
197             LOG.debug("Flushed {} messages to channel {} in {}us", entries,
198                 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
199         }
200     }
201
202     /**
203      * Perform a single flush operation. We keep it here so we do not generate
204      * syntetic accessors for private fields. Otherwise it could be moved into
205      * {@link #flushRunnable}.
206      */
207     protected void flush() {
208         // If the channel is gone, just flush whatever is not completed
209         if (!shuttingDown) {
210             LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
211             writeAndFlush();
212             rescheduleFlush();
213         } else if (currentQueue.finishShutdown()) {
214             handler.onConnectionQueueChanged(null);
215             LOG.debug("Channel {} shutdown complete", parent.getChannel());
216         } else {
217             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
218             rescheduleFlush();
219         }
220     }
221
222     /**
223      * Schedule a queue flush if it is not empty and the channel is found
224      * to be writable. May only be called from Netty context.
225      */
226     private void conditionalFlush() {
227         if (currentQueue.needsFlush()) {
228             if (shuttingDown || parent.getChannel().isWritable()) {
229                 scheduleFlush();
230             } else {
231                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
232             }
233         } else {
234             LOG.trace("Queue is empty, no flush needed");
235         }
236     }
237
238     @Override
239     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
240         super.channelActive(ctx);
241         conditionalFlush();
242     }
243
244     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
245         /*
246          * Tune channel write buffering. We increase the writability window
247          * to ensure we can flush an entire queue segment in one go. We definitely
248          * want to keep the difference above 64k, as that will ensure we use jam-packed
249          * TCP packets. UDP will fragment as appropriate.
250          */
251         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
252         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
253
254         super.handlerAdded(ctx);
255     }
256
257     @Override
258     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
259         super.channelWritabilityChanged(ctx);
260
261         // A simple trade-off. While we could write things right away, if there is a task
262         // schedule, let it have the work
263         if (flushScheduled.compareAndSet(false, true)) {
264             LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
265             flush();
266         } else {
267             LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
268         }
269     }
270
271     @Override
272     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
273         super.channelInactive(ctx);
274
275         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
276
277         shuttingDown = true;
278         final long entries = currentQueue.startShutdown(ctx.channel());
279         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
280
281         scheduleFlush();
282     }
283
284     @Override
285     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
286         // non-volatile read if we are called multiple times
287         if (!alreadyReading) {
288             alreadyReading = true;
289             reading = true;
290         }
291         super.channelRead(ctx, msg);
292     }
293
294     @Override
295     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
296         super.channelReadComplete(ctx);
297         alreadyReading = false;
298         reading = false;
299
300         // TODO: model this as an atomic gate. We need to sync on it to make sure
301         //       that ensureFlushing() suppresses scheudling only if this barrier
302         //       has not been crossed.
303         synchronized (this) {
304             // Run flush regardless of writability. This is not strictly required, as
305             // there may be a scheduled flush. Instead of canceling it, which is expensive,
306             // we'll steal its work. Note that more work may accumulate in the time window
307             // between now and when the task will run, so it may not be a no-op after all.
308             //
309             // The reason for this is to will the output buffer before we go into selection
310             // phase. This will make sure the pipe is full (in which case our next wake up
311             // will be the queue becoming writable).
312             writeAndFlush();
313         }
314
315         LOG.debug("Opportunistic write on channel {}", parent.getChannel());
316         writeAndFlush();
317     }
318
319     @Override
320     public String toString() {
321         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
322     }
323
324     void ensureFlushing() {
325         // If the channel is not writable, there's no point in waking up,
326         // once we become writable, we will run a full flush
327         if (!parent.getChannel().isWritable()) {
328             return;
329         }
330
331         // We are currently reading something, just a quick sync to ensure we will in fact
332         // flush state.
333         if (reading) {
334             synchronized (this) {
335                 if (reading) {
336                     return;
337                 }
338             }
339         }
340
341         // Netty thread is outside our code, we need to schedule a flush
342         // to re-synchronize.
343         scheduleFlush();
344     }
345
346     void onEchoRequest(final EchoRequestMessage message) {
347         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
348         parent.getChannel().writeAndFlush(reply);
349     }
350
351     /**
352      * Write a message into the underlying channel.
353      *
354      * @param now Time reference for 'now'. We take this as an argument, as
355      *            we need a timestamp to mark barrier messages we see swinging
356      *            by. That timestamp does not need to be completely accurate,
357      *            hence we use the flush start time. Alternative would be to
358      *            measure System.nanoTime() for each barrier -- needlessly
359      *            adding overhead.
360      */
361     void writeMessage(final OfHeader message, final long now) {
362         final Object wrapper;
363         if (address == null) {
364             wrapper = new MessageListenerWrapper(message, null);
365         } else {
366             wrapper = new UdpMessageListenerWrapper(message, null, address);
367         }
368         parent.getChannel().write(wrapper);
369
370         if (message instanceof BarrierInput) {
371             LOG.trace("Barrier message seen, resetting counters");
372             nonBarrierMessages = 0;
373             lastBarrierNanos = now;
374         } else {
375             nonBarrierMessages++;
376             if (nonBarrierMessages >= maxNonBarrierMessages) {
377                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
378                 scheduleBarrierMessage();
379             } else if (!barrierTimerEnabled) {
380                 scheduleBarrierTimer(now);
381             }
382         }
383     }
384 }