520d145d87523872d422597c096e24487fdbf164
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import io.netty.util.concurrent.Future;
15 import io.netty.util.concurrent.GenericFutureListener;
16 import java.net.InetSocketAddress;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Class capsulate basic processing for stacking requests for netty channel
30  * and provide functionality for pairing request/response device message communication.
31  */
32 abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
33         extends ChannelInboundHandlerAdapter
34         implements AutoCloseable {
35
36     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
37
38     private static enum PipelineState {
39         /**
40          * Netty thread is potentially idle, no assumptions
41          * can be made about its state.
42          */
43         IDLE,
44         /**
45          * Netty thread is currently reading, once the read completes,
46          * if will flush the queue in the {@link #WRITING} state.
47          */
48         READING,
49         /**
50          * Netty thread is currently performing a flush on the queue.
51          * It will then transition to {@link #IDLE} state.
52          */
53         WRITING,
54     }
55
56     /**
57      * Default low write watermark. Channel will become writable when number of outstanding
58      * bytes dips below this value.
59      */
60     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
61
62     /**
63      * Default write high watermark. Channel will become un-writable when number of
64      * outstanding bytes hits this value.
65      */
66     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
67
68     private final AtomicBoolean flushScheduled = new AtomicBoolean();
69     protected final ConnectionAdapterImpl parent;
70     protected final InetSocketAddress address;
71     protected final O currentQueue;
72     private final T handler;
73
74     // Accessed concurrently
75     private volatile PipelineState state = PipelineState.IDLE;
76
77     // Updated from netty only
78     private boolean alreadyReading;
79     protected boolean shuttingDown;
80
81     // Passed to executor to request triggering of flush
82     protected final Runnable flushRunnable = new Runnable() {
83         @Override
84         public void run() {
85             flush();
86         }
87     };
88
89     AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
90         this.parent = Preconditions.checkNotNull(parent);
91         this.handler = Preconditions.checkNotNull(handler);
92         this.address = address;
93         /* Note: don't wish to use reflection here */
94         currentQueue = initializeStackedOutboudnqueue();
95         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
96
97         handler.onConnectionQueueChanged(currentQueue);
98     }
99
100     /**
101      * Method has to initialize some child of {@link AbstractStackedOutboundQueue}
102      *
103      * @return correct implementation of StacketOutboundqueue
104      */
105     protected abstract O initializeStackedOutboudnqueue();
106
107     @Override
108     public void close() {
109         handler.onConnectionQueueChanged(null);
110     }
111
112     @Override
113     public String toString() {
114         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
115     }
116
117     @Override
118     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
119         /*
120          * Tune channel write buffering. We increase the writability window
121          * to ensure we can flush an entire queue segment in one go. We definitely
122          * want to keep the difference above 64k, as that will ensure we use jam-packed
123          * TCP packets. UDP will fragment as appropriate.
124          */
125         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
126         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
127
128         super.handlerAdded(ctx);
129     }
130
131     @Override
132     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
133         super.channelActive(ctx);
134         conditionalFlush();
135     }
136
137     @Override
138     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
139         super.channelReadComplete(ctx);
140
141         // Run flush regardless of writability. This is not strictly required, as
142         // there may be a scheduled flush. Instead of canceling it, which is expensive,
143         // we'll steal its work. Note that more work may accumulate in the time window
144         // between now and when the task will run, so it may not be a no-op after all.
145         //
146         // The reason for this is to will the output buffer before we go into selection
147         // phase. This will make sure the pipe is full (in which case our next wake up
148         // will be the queue becoming writable).
149         writeAndFlush();
150     }
151
152     @Override
153     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
154         super.channelWritabilityChanged(ctx);
155
156         // The channel is writable again. There may be a flush task on the way, but let's
157         // steal its work, potentially decreasing latency. Since there is a window between
158         // now and when it will run, it may still pick up some more work to do.
159         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
160         writeAndFlush();
161     }
162
163     @Override
164     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
165         super.channelInactive(ctx);
166
167         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
168
169         shuttingDown = true;
170         final long entries = currentQueue.startShutdown(ctx.channel());
171         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
172
173         scheduleFlush();
174     }
175
176     @Override
177     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
178         // Netty does not provide a 'start reading' callback, so this is our first
179         // (and repeated) chance to detect reading. Since this callback can be invoked
180         // multiple times, we keep a boolean we check. That prevents a volatile write
181         // on repeated invocations. It will be cleared in channelReadComplete().
182         if (!alreadyReading) {
183             alreadyReading = true;
184             state = PipelineState.READING;
185         }
186         super.channelRead(ctx, msg);
187     }
188
189     /**
190      * Invoked whenever a message comes in from the switch. Runs matching
191      * on all active queues in an attempt to complete a previous request.
192      *
193      * @param message Potential response message
194      * @return True if the message matched a previous request, false otherwise.
195      */
196     boolean onMessage(final OfHeader message) {
197         LOG.trace("Attempting to pair message {} to a request", message);
198
199         return currentQueue.pairRequest(message);
200     }
201
202     T getHandler() {
203         return handler;
204     }
205
206     void ensureFlushing() {
207         // If the channel is not writable, there's no point in waking up,
208         // once we become writable, we will run a full flush
209         if (!parent.getChannel().isWritable()) {
210             return;
211         }
212
213         // We are currently reading something, just a quick sync to ensure we will in fact
214         // flush state.
215         final PipelineState localState = state;
216         LOG.debug("Synchronize on pipeline state {}", localState);
217         switch (localState) {
218         case READING:
219             // Netty thread is currently reading, it will flush the pipeline once it
220             // finishes reading. This is a no-op situation.
221             break;
222         case WRITING:
223         case IDLE:
224         default:
225             // We cannot rely on the change being flushed, schedule a request
226             scheduleFlush();
227         }
228     }
229
230     /**
231      * Method immediately response on Echo message.
232      *
233      * @param message incoming Echo message from device
234      */
235     void onEchoRequest(final EchoRequestMessage message) {
236         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
237                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
238         parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
239     }
240
241     /**
242      * Wraps outgoing message and includes listener attached to this message
243      * which is send to OFEncoder for serialization. Correct wrapper is
244      * selected by communication pipeline.
245      *
246      * @param message
247      * @param now
248      */
249     void writeMessage(final OfHeader message, final long now) {
250         final Object wrapper = makeMessageListenerWrapper(message);
251         parent.getChannel().write(wrapper);
252     }
253
254     /**
255      * Wraps outgoing message and includes listener attached to this message
256      * which is send to OFEncoder for serialization. Correct wrapper is
257      * selected by communication pipeline.
258      *
259      * @return
260      */
261     protected Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
262         Preconditions.checkArgument(msg != null);
263
264         if (address == null) {
265             return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
266         }
267         return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
268     }
269
270     /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
271     private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
272
273         private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
274
275         @Override
276         public void operationComplete(final Future<Void> future) throws Exception {
277             if (future.cause() != null) {
278                 LOGGER.warn("Message encoding fail !", future.cause());
279             }
280         }
281     };
282
283     /**
284      * Perform a single flush operation. We keep it here so we do not generate
285      * syntetic accessors for private fields. Otherwise it could be moved into {@link #flushRunnable}.
286      */
287     protected void flush() {
288         // If the channel is gone, just flush whatever is not completed
289         if (!shuttingDown) {
290             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
291             writeAndFlush();
292             rescheduleFlush();
293         } else if (currentQueue.finishShutdown()) {
294             close();
295             LOG.debug("Channel {} shutdown complete", parent.getChannel());
296         } else {
297             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
298             rescheduleFlush();
299         }
300     }
301
302     private void scheduleFlush() {
303         if (flushScheduled.compareAndSet(false, true)) {
304             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
305             parent.getChannel().eventLoop().execute(flushRunnable);
306         } else {
307             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
308         }
309     }
310
311     private void writeAndFlush() {
312         state = PipelineState.WRITING;
313
314         final long start = System.nanoTime();
315
316         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
317         if (entries > 0) {
318             LOG.trace("Flushing channel {}", parent.getChannel());
319             parent.getChannel().flush();
320         }
321
322         if (LOG.isDebugEnabled()) {
323             final long stop = System.nanoTime();
324             LOG.debug("Flushed {} messages to channel {} in {}us", entries, parent.getChannel(),
325                     TimeUnit.NANOSECONDS.toMicros(stop - start));
326         }
327
328         state = PipelineState.IDLE;
329     }
330
331     private void rescheduleFlush() {
332         /*
333          * We are almost ready to terminate. This is a bit tricky, because
334          * we do not want to have a race window where a message would be
335          * stuck on the queue without a flush being scheduled.
336          * So we mark ourselves as not running and then re-check if a
337          * flush out is needed. That will re-synchronized with other threads
338          * such that only one flush is scheduled at any given time.
339          */
340         if (!flushScheduled.compareAndSet(true, false)) {
341             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
342         }
343
344         conditionalFlush();
345     }
346
347     /**
348      * Schedule a queue flush if it is not empty and the channel is found
349      * to be writable. May only be called from Netty context.
350      */
351     private void conditionalFlush() {
352         if (currentQueue.needsFlush()) {
353             if (shuttingDown || parent.getChannel().isWritable()) {
354                 scheduleFlush();
355             } else {
356                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
357             }
358         } else {
359             LOG.trace("Queue is empty, no flush needed");
360         }
361     }
362 }