49c6ddfa9fb47c155796cbdb3d453cd36704c22d
[openflowjava.git] / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10
11 import com.google.common.base.Preconditions;
12 import io.netty.channel.ChannelHandlerContext;
13 import io.netty.channel.ChannelInboundHandlerAdapter;
14 import io.netty.util.concurrent.Future;
15 import io.netty.util.concurrent.GenericFutureListener;
16 import java.net.InetSocketAddress;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicBoolean;
19 import javax.annotation.Nonnull;
20 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 /**
29  * Class capsulate basic processing for stacking requests for netty channel
30  * and provide functionality for pairing request/response device message communication.
31  */
32 abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter
33         implements AutoCloseable {
34
35     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
36
37     private static enum PipelineState {
38         /**
39          * Netty thread is potentially idle, no assumptions
40          * can be made about its state.
41          */
42         IDLE,
43         /**
44          * Netty thread is currently reading, once the read completes,
45          * if will flush the queue in the {@link #WRITING} state.
46          */
47         READING,
48         /**
49          * Netty thread is currently performing a flush on the queue.
50          * It will then transition to {@link #IDLE} state.
51          */
52         WRITING,
53     }
54
55     /**
56      * Default low write watermark. Channel will become writable when number of outstanding
57      * bytes dips below this value.
58      */
59     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
60
61     /**
62      * Default write high watermark. Channel will become un-writable when number of
63      * outstanding bytes hits this value.
64      */
65     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
66
67     private final AtomicBoolean flushScheduled = new AtomicBoolean();
68     protected final ConnectionAdapterImpl parent;
69     protected final InetSocketAddress address;
70     protected final StackedOutboundQueue currentQueue;
71     private final T handler;
72
73     // Accessed concurrently
74     private volatile PipelineState state = PipelineState.IDLE;
75
76     // Updated from netty only
77     private boolean alreadyReading;
78     protected boolean shuttingDown;
79
80     // Passed to executor to request triggering of flush
81     protected final Runnable flushRunnable = new Runnable() {
82         @Override
83         public void run() {
84             flush();
85         }
86     };
87
88     AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
89         this.parent = Preconditions.checkNotNull(parent);
90         this.handler = Preconditions.checkNotNull(handler);
91         this.address = address;
92         currentQueue = new StackedOutboundQueue(this);
93         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
94
95         handler.onConnectionQueueChanged(currentQueue);
96     }
97
98     @Override
99     public void close() {
100         handler.onConnectionQueueChanged(null);
101     }
102
103     @Override
104     public String toString() {
105         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
106     }
107
108     @Override
109     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
110         /*
111          * Tune channel write buffering. We increase the writability window
112          * to ensure we can flush an entire queue segment in one go. We definitely
113          * want to keep the difference above 64k, as that will ensure we use jam-packed
114          * TCP packets. UDP will fragment as appropriate.
115          */
116         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
117         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
118
119         super.handlerAdded(ctx);
120     }
121
122     @Override
123     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
124         super.channelActive(ctx);
125         conditionalFlush();
126     }
127
128     @Override
129     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
130         super.channelReadComplete(ctx);
131
132         // Run flush regardless of writability. This is not strictly required, as
133         // there may be a scheduled flush. Instead of canceling it, which is expensive,
134         // we'll steal its work. Note that more work may accumulate in the time window
135         // between now and when the task will run, so it may not be a no-op after all.
136         //
137         // The reason for this is to will the output buffer before we go into selection
138         // phase. This will make sure the pipe is full (in which case our next wake up
139         // will be the queue becoming writable).
140         writeAndFlush();
141     }
142
143     @Override
144     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
145         super.channelWritabilityChanged(ctx);
146
147         // The channel is writable again. There may be a flush task on the way, but let's
148         // steal its work, potentially decreasing latency. Since there is a window between
149         // now and when it will run, it may still pick up some more work to do.
150         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
151         writeAndFlush();
152     }
153
154     @Override
155     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
156         super.channelInactive(ctx);
157
158         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
159
160         shuttingDown = true;
161         final long entries = currentQueue.startShutdown(ctx.channel());
162         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
163
164         scheduleFlush();
165     }
166
167     @Override
168     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
169         // Netty does not provide a 'start reading' callback, so this is our first
170         // (and repeated) chance to detect reading. Since this callback can be invoked
171         // multiple times, we keep a boolean we check. That prevents a volatile write
172         // on repeated invocations. It will be cleared in channelReadComplete().
173         if (!alreadyReading) {
174             alreadyReading = true;
175             state = PipelineState.READING;
176         }
177         super.channelRead(ctx, msg);
178     }
179
180     /**
181      * Invoked whenever a message comes in from the switch. Runs matching
182      * on all active queues in an attempt to complete a previous request.
183      *
184      * @param message Potential response message
185      * @return True if the message matched a previous request, false otherwise.
186      */
187     boolean onMessage(final OfHeader message) {
188         LOG.trace("Attempting to pair message {} to a request", message);
189
190         return currentQueue.pairRequest(message);
191     }
192
193     T getHandler() {
194         return handler;
195     }
196
197     void ensureFlushing() {
198         // If the channel is not writable, there's no point in waking up,
199         // once we become writable, we will run a full flush
200         if (!parent.getChannel().isWritable()) {
201             return;
202         }
203
204         // We are currently reading something, just a quick sync to ensure we will in fact
205         // flush state.
206         final PipelineState localState = state;
207         LOG.debug("Synchronize on pipeline state {}", localState);
208         switch (localState) {
209         case READING:
210             // Netty thread is currently reading, it will flush the pipeline once it
211             // finishes reading. This is a no-op situation.
212             break;
213         case WRITING:
214         case IDLE:
215         default:
216             // We cannot rely on the change being flushed, schedule a request
217             scheduleFlush();
218         }
219     }
220
221     /**
222      * Method immediately response on Echo message.
223      *
224      * @param message incoming Echo message from device
225      */
226     void onEchoRequest(final EchoRequestMessage message) {
227         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
228                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
229         parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
230     }
231
232     /**
233      * Wraps outgoing message and includes listener attached to this message
234      * which is send to OFEncoder for serialization. Correct wrapper is
235      * selected by communication pipeline.
236      *
237      * @param message
238      * @param now
239      */
240     void writeMessage(final OfHeader message, final long now) {
241         final Object wrapper = makeMessageListenerWrapper(message);
242         parent.getChannel().write(wrapper);
243     }
244
245     /**
246      * Wraps outgoing message and includes listener attached to this message
247      * which is send to OFEncoder for serialization. Correct wrapper is
248      * selected by communication pipeline.
249      *
250      * @return
251      */
252     private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
253         Preconditions.checkArgument(msg != null);
254
255         if (address == null) {
256             return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
257         }
258         return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
259     }
260
261     /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
262     private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
263
264         private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
265
266         @Override
267         public void operationComplete(final Future<Void> future) throws Exception {
268             if (future.cause() != null) {
269                 LOGGER.warn("Message encoding fail !", future.cause());
270             }
271         }
272     };
273
274     /**
275      * Perform a single flush operation. We keep it here so we do not generate
276      * syntetic accessors for private fields. Otherwise it could be moved into {@link #flushRunnable}.
277      */
278     protected void flush() {
279         // If the channel is gone, just flush whatever is not completed
280         if (!shuttingDown) {
281             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
282             writeAndFlush();
283             rescheduleFlush();
284         } else if (currentQueue.finishShutdown()) {
285             close();
286             LOG.debug("Channel {} shutdown complete", parent.getChannel());
287         } else {
288             LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
289             rescheduleFlush();
290         }
291     }
292
293     private void scheduleFlush() {
294         if (flushScheduled.compareAndSet(false, true)) {
295             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
296             parent.getChannel().eventLoop().execute(flushRunnable);
297         } else {
298             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
299         }
300     }
301
302     private void writeAndFlush() {
303         state = PipelineState.WRITING;
304
305         final long start = System.nanoTime();
306
307         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
308         if (entries > 0) {
309             LOG.trace("Flushing channel {}", parent.getChannel());
310             parent.getChannel().flush();
311         }
312
313         if (LOG.isDebugEnabled()) {
314             final long stop = System.nanoTime();
315             LOG.debug("Flushed {} messages to channel {} in {}us", entries, parent.getChannel(),
316                     TimeUnit.NANOSECONDS.toMicros(stop - start));
317         }
318
319         state = PipelineState.IDLE;
320     }
321
322     private void rescheduleFlush() {
323         /*
324          * We are almost ready to terminate. This is a bit tricky, because
325          * we do not want to have a race window where a message would be
326          * stuck on the queue without a flush being scheduled.
327          * So we mark ourselves as not running and then re-check if a
328          * flush out is needed. That will re-synchronized with other threads
329          * such that only one flush is scheduled at any given time.
330          */
331         if (!flushScheduled.compareAndSet(true, false)) {
332             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
333         }
334
335         conditionalFlush();
336     }
337
338     /**
339      * Schedule a queue flush if it is not empty and the channel is found
340      * to be writable. May only be called from Netty context.
341      */
342     private void conditionalFlush() {
343         if (currentQueue.needsFlush()) {
344             if (shuttingDown || parent.getChannel().isWritable()) {
345                 scheduleFlush();
346             } else {
347                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
348             }
349         } else {
350             LOG.trace("Queue is empty, no flush needed");
351         }
352     }
353 }