BUG-4072: Echo reply missing
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / OutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelFuture;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import io.netty.util.concurrent.Future;
15 import io.netty.util.concurrent.GenericFutureListener;
16 import java.net.InetSocketAddress;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
29     private static enum PipelineState {
30         /**
31          * Netty thread is potentially idle, no assumptions
32          * can be made about its state.
33          */
34         IDLE,
35         /**
36          * Netty thread is currently reading, once the read completes,
37          * if will flush the queue in the {@link #FLUSHING} state.
38          */
39         READING,
40         /**
41          * Netty thread is currently performing a flush on the queue.
42          * It will then transition to {@link #IDLE} state.
43          */
44         WRITING,
45     }
46
47     private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
48
49     /**
50      * Default low write watermark. Channel will become writable when number of outstanding
51      * bytes dips below this value.
52      */
53     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
54
55     /**
56      * Default write high watermark. Channel will become un-writable when number of
57      * outstanding bytes hits this value.
58      */
59     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
60
61     private final AtomicBoolean flushScheduled = new AtomicBoolean();
62     private final StackedOutboundQueue currentQueue;
63     private final ConnectionAdapterImpl parent;
64     private final InetSocketAddress address;
65     private final int maxNonBarrierMessages;
66     private final long maxBarrierNanos;
67     private final T handler;
68
69     // Accessed concurrently
70     private volatile PipelineState state = PipelineState.IDLE;
71
72     // Updated from netty only
73     private boolean alreadyReading;
74     private boolean barrierTimerEnabled;
75     private long lastBarrierNanos = System.nanoTime();
76     private int nonBarrierMessages;
77     private boolean shuttingDown;
78
79     // Passed to executor to request triggering of flush
80     private final Runnable flushRunnable = new Runnable() {
81         @Override
82         public void run() {
83             flush();
84         }
85     };
86
87     // Passed to executor to request a periodic barrier check
88     private final Runnable barrierRunnable = new Runnable() {
89         @Override
90         public void run() {
91             barrier();
92         }
93     };
94
95     OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
96         final int maxNonBarrierMessages, final long maxBarrierNanos) {
97         this.parent = Preconditions.checkNotNull(parent);
98         this.handler = Preconditions.checkNotNull(handler);
99         Preconditions.checkArgument(maxNonBarrierMessages > 0);
100         this.maxNonBarrierMessages = maxNonBarrierMessages;
101         Preconditions.checkArgument(maxBarrierNanos > 0);
102         this.maxBarrierNanos = maxBarrierNanos;
103         this.address = address;
104
105         currentQueue = new StackedOutboundQueue(this);
106         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
107         handler.onConnectionQueueChanged(currentQueue);
108     }
109
110     T getHandler() {
111         return handler;
112     }
113
114     @Override
115     public void close() {
116         handler.onConnectionQueueChanged(null);
117     }
118
119     private void scheduleBarrierTimer(final long now) {
120         long next = lastBarrierNanos + maxBarrierNanos;
121         if (next < now) {
122             LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
123             next = now + maxBarrierNanos;
124         }
125
126         final long delay = next - now;
127         LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
128         parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
129         barrierTimerEnabled = true;
130     }
131
132     private void scheduleBarrierMessage() {
133         final Long xid = currentQueue.reserveBarrierIfNeeded();
134         if (xid == null) {
135             LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
136             return;
137         }
138
139         currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
140         LOG.trace("Barrier XID {} scheduled", xid);
141     }
142
143
144     /**
145      * Invoked whenever a message comes in from the switch. Runs matching
146      * on all active queues in an attempt to complete a previous request.
147      *
148      * @param message Potential response message
149      * @return True if the message matched a previous request, false otherwise.
150      */
151     boolean onMessage(final OfHeader message) {
152         LOG.trace("Attempting to pair message {} to a request", message);
153
154         return currentQueue.pairRequest(message);
155     }
156
157     private void scheduleFlush() {
158         if (flushScheduled.compareAndSet(false, true)) {
159             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
160             parent.getChannel().eventLoop().execute(flushRunnable);
161         } else {
162             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
163         }
164     }
165
166     /**
167      * Periodic barrier check.
168      */
169     protected void barrier() {
170         LOG.debug("Channel {} barrier timer expired", parent.getChannel());
171         barrierTimerEnabled = false;
172         if (shuttingDown) {
173             LOG.trace("Channel shut down, not processing barrier");
174             return;
175         }
176
177         final long now = System.nanoTime();
178         final long sinceLast = now - lastBarrierNanos;
179         if (sinceLast >= maxBarrierNanos) {
180             LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
181             // FIXME: we should be tracking requests/responses instead of this
182             if (nonBarrierMessages == 0) {
183                 LOG.trace("No messages written since last barrier, not issuing one");
184             } else {
185                 scheduleBarrierMessage();
186             }
187         }
188     }
189
190     private void rescheduleFlush() {
191         /*
192          * We are almost ready to terminate. This is a bit tricky, because
193          * we do not want to have a race window where a message would be
194          * stuck on the queue without a flush being scheduled.
195          *
196          * So we mark ourselves as not running and then re-check if a
197          * flush out is needed. That will re-synchronized with other threads
198          * such that only one flush is scheduled at any given time.
199          */
200         if (!flushScheduled.compareAndSet(true, false)) {
201             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
202         }
203
204         conditionalFlush();
205     }
206
207     private void writeAndFlush() {
208         state = PipelineState.WRITING;
209
210         final long start = System.nanoTime();
211
212         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
213         if (entries > 0) {
214             LOG.trace("Flushing channel {}", parent.getChannel());
215             parent.getChannel().flush();
216         }
217
218         if (LOG.isDebugEnabled()) {
219             final long stop = System.nanoTime();
220             LOG.debug("Flushed {} messages to channel {} in {}us", entries,
221                 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
222         }
223
224         state = PipelineState.IDLE;
225     }
226
227     /**
228      * Perform a single flush operation. We keep it here so we do not generate
229      * syntetic accessors for private fields. Otherwise it could be moved into
230      * {@link #flushRunnable}.
231      */
232     protected void flush() {
233         // If the channel is gone, just flush whatever is not completed
234         if (!shuttingDown) {
235             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
236             writeAndFlush();
237             rescheduleFlush();
238         } else if (currentQueue.finishShutdown()) {
239             handler.onConnectionQueueChanged(null);
240             LOG.debug("Channel {} shutdown complete", parent.getChannel());
241         } else {
242             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
243             rescheduleFlush();
244         }
245     }
246
247     /**
248      * Schedule a queue flush if it is not empty and the channel is found
249      * to be writable. May only be called from Netty context.
250      */
251     private void conditionalFlush() {
252         if (currentQueue.needsFlush()) {
253             if (shuttingDown || parent.getChannel().isWritable()) {
254                 scheduleFlush();
255             } else {
256                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
257             }
258         } else {
259             LOG.trace("Queue is empty, no flush needed");
260         }
261     }
262
263     @Override
264     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
265         super.channelActive(ctx);
266         conditionalFlush();
267     }
268
269     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
270         /*
271          * Tune channel write buffering. We increase the writability window
272          * to ensure we can flush an entire queue segment in one go. We definitely
273          * want to keep the difference above 64k, as that will ensure we use jam-packed
274          * TCP packets. UDP will fragment as appropriate.
275          */
276         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
277         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
278
279         super.handlerAdded(ctx);
280     }
281
282     @Override
283     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
284         super.channelWritabilityChanged(ctx);
285
286         // The channel is writable again. There may be a flush task on the way, but let's
287         // steal its work, potentially decreasing latency. Since there is a window between
288         // now and when it will run, it may still pick up some more work to do.
289         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
290         writeAndFlush();
291     }
292
293     @Override
294     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
295         super.channelInactive(ctx);
296
297         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
298
299         shuttingDown = true;
300         final long entries = currentQueue.startShutdown(ctx.channel());
301         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
302
303         scheduleFlush();
304     }
305
306     @Override
307     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
308         // Netty does not provide a 'start reading' callback, so this is our first
309         // (and repeated) chance to detect reading. Since this callback can be invoked
310         // multiple times, we keep a boolean we check. That prevents a volatile write
311         // on repeated invocations. It will be cleared in channelReadComplete().
312         if (!alreadyReading) {
313             alreadyReading = true;
314             state = PipelineState.READING;
315         }
316         super.channelRead(ctx, msg);
317     }
318
319     @Override
320     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
321         super.channelReadComplete(ctx);
322
323         // Run flush regardless of writability. This is not strictly required, as
324         // there may be a scheduled flush. Instead of canceling it, which is expensive,
325         // we'll steal its work. Note that more work may accumulate in the time window
326         // between now and when the task will run, so it may not be a no-op after all.
327         //
328         // The reason for this is to will the output buffer before we go into selection
329         // phase. This will make sure the pipe is full (in which case our next wake up
330         // will be the queue becoming writable).
331         writeAndFlush();
332     }
333
334     @Override
335     public String toString() {
336         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
337     }
338
339     void ensureFlushing() {
340         // If the channel is not writable, there's no point in waking up,
341         // once we become writable, we will run a full flush
342         if (!parent.getChannel().isWritable()) {
343             return;
344         }
345
346         // We are currently reading something, just a quick sync to ensure we will in fact
347         // flush state.
348         final PipelineState localState = state;
349         LOG.debug("Synchronize on pipeline state {}", localState);
350         switch (localState) {
351         case READING:
352             // Netty thread is currently reading, it will flush the pipeline once it
353             // finishes reading. This is a no-op situation.
354             break;
355         case WRITING:
356         case IDLE:
357         default:
358             // We cannot rely on the change being flushed, schedule a request
359             scheduleFlush();
360         }
361     }
362
363     void onEchoRequest(final EchoRequestMessage message) {
364         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
365                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
366         final SimpleRpcListener simpleMsgRpcListener = new SimpleRpcListener(reply, "echo-reply sending failed");
367
368         final GenericFutureListener<Future<Void>> msgProcessListener = simpleMsgRpcListener.takeListener();
369         final Object messageListenerWrapper;
370         if (address == null) {
371             messageListenerWrapper = new MessageListenerWrapper(simpleMsgRpcListener.takeMessage(), msgProcessListener);
372         } else {
373             messageListenerWrapper = new UdpMessageListenerWrapper(simpleMsgRpcListener.takeMessage(), msgProcessListener, address);
374         }
375         final ChannelFuture channelFuture = parent.getChannel().writeAndFlush(messageListenerWrapper);
376         if (msgProcessListener != null) {
377             channelFuture.addListener(msgProcessListener);
378         }
379     }
380
381     /**
382      * Write a message into the underlying channel.
383      *
384      * @param now Time reference for 'now'. We take this as an argument, as
385      *            we need a timestamp to mark barrier messages we see swinging
386      *            by. That timestamp does not need to be completely accurate,
387      *            hence we use the flush start time. Alternative would be to
388      *            measure System.nanoTime() for each barrier -- needlessly
389      *            adding overhead.
390      */
391     void writeMessage(final OfHeader message, final long now) {
392         final Object wrapper;
393         if (address == null) {
394             wrapper = new MessageListenerWrapper(message, null);
395         } else {
396             wrapper = new UdpMessageListenerWrapper(message, null, address);
397         }
398         parent.getChannel().write(wrapper);
399
400         if (message instanceof BarrierInput) {
401             LOG.trace("Barrier message seen, resetting counters");
402             nonBarrierMessages = 0;
403             lastBarrierNanos = now;
404         } else {
405             nonBarrierMessages++;
406             if (nonBarrierMessages >= maxNonBarrierMessages) {
407                 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
408                 scheduleBarrierMessage();
409             } else if (!barrierTimerEnabled) {
410                 scheduleBarrierTimer(now);
411             }
412         }
413     }
414 }