Bump MRI upstreams
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
14 import io.netty.channel.ChannelHandlerContext;
15 import io.netty.channel.ChannelInboundHandlerAdapter;
16 import io.netty.util.concurrent.Future;
17 import io.netty.util.concurrent.GenericFutureListener;
18 import java.math.BigInteger;
19 import java.net.InetSocketAddress;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
27 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 /**
32  * Class capsulate basic processing for stacking requests for netty channel
33  * and provide functionality for pairing request/response device message communication.
34  */
35 abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
36         extends ChannelInboundHandlerAdapter
37         implements AutoCloseable {
38
39     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
40
41     private enum PipelineState {
42         /**
43          * Netty thread is potentially idle, no assumptions
44          * can be made about its state.
45          */
46         IDLE,
47         /**
48          * Netty thread is currently reading, once the read completes,
49          * if will flush the queue in the {@link #WRITING} state.
50          */
51         READING,
52         /**
53          * Netty thread is currently performing a flush on the queue.
54          * It will then transition to {@link #IDLE} state.
55          */
56         WRITING,
57     }
58
59     /**
60      * Default low write watermark. Channel will become writable when number of outstanding
61      * bytes dips below this value.
62      */
63     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
64
65     /**
66      * Default write high watermark. Channel will become un-writable when number of
67      * outstanding bytes hits this value.
68      */
69     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
70
71     private final AtomicBoolean flushScheduled = new AtomicBoolean();
72     protected final ConnectionAdapterImpl parent;
73     protected final InetSocketAddress address;
74     protected final O currentQueue;
75     private final T handler;
76
77     // Accessed concurrently
78     private volatile PipelineState state = PipelineState.IDLE;
79
80     // Updated from netty only
81     private boolean alreadyReading;
82     protected boolean shuttingDown;
83
84     // Passed to executor to request triggering of flush
85     protected final Runnable flushRunnable = this::flush;
86
87     @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR",
88         justification = "Circular dependency on outbound queue")
89     AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
90         this.parent = requireNonNull(parent);
91         this.handler = requireNonNull(handler);
92         this.address = address;
93         /* Note: don't wish to use reflection here */
94         currentQueue = initializeStackedOutboudnqueue();
95         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
96
97         handler.onConnectionQueueChanged(currentQueue);
98     }
99
100     /**
101      * Method has to initialize some child of {@link AbstractStackedOutboundQueue}.
102      *
103      * @return correct implementation of StacketOutboundqueue
104      */
105     protected abstract O initializeStackedOutboudnqueue();
106
107     @Override
108     public void close() {
109         handler.onConnectionQueueChanged(null);
110     }
111
112     @Override
113     public String toString() {
114         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
115     }
116
117     @Override
118     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
119         /*
120          * Tune channel write buffering. We increase the writability window
121          * to ensure we can flush an entire queue segment in one go. We definitely
122          * want to keep the difference above 64k, as that will ensure we use jam-packed
123          * TCP packets. UDP will fragment as appropriate.
124          */
125         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
126         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
127
128         super.handlerAdded(ctx);
129     }
130
131     @Override
132     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
133         super.channelActive(ctx);
134         conditionalFlush();
135     }
136
137     @Override
138     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
139         super.channelReadComplete(ctx);
140
141         // Run flush regardless of writability. This is not strictly required, as
142         // there may be a scheduled flush. Instead of canceling it, which is expensive,
143         // we'll steal its work. Note that more work may accumulate in the time window
144         // between now and when the task will run, so it may not be a no-op after all.
145         //
146         // The reason for this is to fill the output buffer before we go into selection
147         // phase. This will make sure the pipe is full (in which case our next wake up
148         // will be the queue becoming writable).
149         writeAndFlush();
150         alreadyReading = false;
151     }
152
153     @Override
154     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
155         super.channelWritabilityChanged(ctx);
156
157         // The channel is writable again. There may be a flush task on the way, but let's
158         // steal its work, potentially decreasing latency. Since there is a window between
159         // now and when it will run, it may still pick up some more work to do.
160         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
161         writeAndFlush();
162     }
163
164     @Override
165     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
166         // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
167         // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
168         // in the queue.
169         super.channelInactive(ctx);
170
171         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
172
173         // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
174         // indefinitely) and failing not completed entries.
175         shuttingDown = true;
176         final long entries = currentQueue.startShutdown();
177         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
178
179         // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
180         // when there is more than shutdownOffset messages enqueued in unflushed segments
181         // (AbstractStackedOutboundQueue#finishShutdown()).
182         scheduleFlush();
183     }
184
185     @Override
186     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
187         // Netty does not provide a 'start reading' callback, so this is our first
188         // (and repeated) chance to detect reading. Since this callback can be invoked
189         // multiple times, we keep a boolean we check. That prevents a volatile write
190         // on repeated invocations. It will be cleared in channelReadComplete().
191         if (!alreadyReading) {
192             alreadyReading = true;
193             state = PipelineState.READING;
194         }
195         super.channelRead(ctx, msg);
196     }
197
198     /**
199      * Invoked whenever a message comes in from the switch. Runs matching
200      * on all active queues in an attempt to complete a previous request.
201      *
202      * @param message Potential response message
203      * @return True if the message matched a previous request, false otherwise.
204      */
205     boolean onMessage(final OfHeader message) {
206         LOG.trace("Attempting to pair message {} to a request", message);
207
208         return currentQueue.pairRequest(message);
209     }
210
211     T getHandler() {
212         return handler;
213     }
214
215     void ensureFlushing() {
216         // If the channel is not writable, there's no point in waking up,
217         // once we become writable, we will run a full flush
218         if (!parent.getChannel().isWritable()) {
219             return;
220         }
221
222         // We are currently reading something, just a quick sync to ensure we will in fact
223         // flush state.
224         final PipelineState localState = state;
225         LOG.debug("Synchronize on pipeline state {}", localState);
226         switch (localState) {
227             case READING:
228                 // Netty thread is currently reading, it will flush the pipeline once it
229                 // finishes reading. This is a no-op situation.
230                 break;
231             case WRITING:
232             case IDLE:
233             default:
234                 // We cannot rely on the change being flushed, schedule a request
235                 scheduleFlush();
236         }
237     }
238
239     /**
240      * Method immediately response on Echo message.
241      *
242      * @param message incoming Echo message from device
243      * @param datapathId the dpnId of the node
244      */
245     void onEchoRequest(final EchoRequestMessage message, BigInteger datapathId) {
246         LOG.debug("echo request received: {} for the DPN {}", message.getXid(), datapathId);
247         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
248                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
249         parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
250     }
251
252     /**
253      * Wraps outgoing message and includes listener attached to this message
254      * which is send to OFEncoder for serialization. Correct wrapper is
255      * selected by communication pipeline.
256      */
257     void writeMessage(final OfHeader message, final long now) {
258         final Object wrapper = makeMessageListenerWrapper(message);
259         parent.getChannel().write(wrapper);
260     }
261
262     /**
263      * Wraps outgoing message and includes listener attached to this message
264      * which is send to OFEncoder for serialization. Correct wrapper is
265      * selected by communication pipeline.
266      */
267     protected Object makeMessageListenerWrapper(@NonNull final OfHeader msg) {
268         checkArgument(msg != null);
269         if (address == null) {
270             return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
271         }
272         return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
273     }
274
275     /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
276     private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = future -> {
277         if (future.cause() != null) {
278             LOG.warn("Message encoding fail !", future.cause());
279         }
280     };
281
282     /**
283      * Perform a single flush operation. We keep it here so we do not generate
284      * syntetic accessors for private fields. Otherwise it could be moved into {@link #flushRunnable}.
285      */
286     protected void flush() {
287         // If the channel is gone, just flush whatever is not completed
288         if (!shuttingDown) {
289             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
290             writeAndFlush();
291             rescheduleFlush();
292         } else {
293             close();
294             if (currentQueue.finishShutdown(parent.getChannel())) {
295                 LOG.debug("Channel {} shutdown complete", parent.getChannel());
296             } else {
297                 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
298                 rescheduleFlush();
299             }
300         }
301     }
302
303     private void scheduleFlush() {
304         if (flushScheduled.compareAndSet(false, true)) {
305             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
306             parent.getChannel().eventLoop().execute(flushRunnable);
307         } else {
308             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
309         }
310     }
311
312     private void writeAndFlush() {
313         state = PipelineState.WRITING;
314
315         final long start = System.nanoTime();
316
317         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
318         if (entries > 0) {
319             LOG.trace("Flushing channel {}", parent.getChannel());
320             parent.getChannel().flush();
321         }
322
323         if (LOG.isDebugEnabled()) {
324             final long stop = System.nanoTime();
325             LOG.debug("Flushed {} messages to channel {} in {}us", entries, parent.getChannel(),
326                     TimeUnit.NANOSECONDS.toMicros(stop - start));
327         }
328
329         state = PipelineState.IDLE;
330     }
331
332     private void rescheduleFlush() {
333         /*
334          * We are almost ready to terminate. This is a bit tricky, because
335          * we do not want to have a race window where a message would be
336          * stuck on the queue without a flush being scheduled.
337          * So we mark ourselves as not running and then re-check if a
338          * flush out is needed. That will re-synchronized with other threads
339          * such that only one flush is scheduled at any given time.
340          */
341         if (!flushScheduled.compareAndSet(true, false)) {
342             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
343         }
344
345         conditionalFlush();
346     }
347
348     /**
349      * Schedule a queue flush if it is not empty and the channel is found
350      * to be writable. May only be called from Netty context.
351      */
352     private void conditionalFlush() {
353         if (currentQueue.needsFlush()) {
354             if (shuttingDown || parent.getChannel().isWritable()) {
355                 scheduleFlush();
356             } else {
357                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
358             }
359         } else {
360             LOG.trace("Queue is empty, no flush needed");
361         }
362     }
363 }