Merge "Use String(byte[], Charset)"
[openflowplugin.git] / openflowjava / openflow-protocol-impl / src / main / java / org / opendaylight / openflowjava / protocol / impl / core / connection / AbstractOutboundQueueManager.java
1 /*
2  * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
9
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
12
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelInboundHandlerAdapter;
15 import io.netty.util.concurrent.Future;
16 import io.netty.util.concurrent.GenericFutureListener;
17 import java.math.BigInteger;
18 import java.net.InetSocketAddress;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * Class capsulate basic processing for stacking requests for netty channel
32  * and provide functionality for pairing request/response device message communication.
33  */
34 abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
35         extends ChannelInboundHandlerAdapter
36         implements AutoCloseable {
37
38     private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
39
40     private enum PipelineState {
41         /**
42          * Netty thread is potentially idle, no assumptions
43          * can be made about its state.
44          */
45         IDLE,
46         /**
47          * Netty thread is currently reading, once the read completes,
48          * if will flush the queue in the {@link #WRITING} state.
49          */
50         READING,
51         /**
52          * Netty thread is currently performing a flush on the queue.
53          * It will then transition to {@link #IDLE} state.
54          */
55         WRITING,
56     }
57
58     /**
59      * Default low write watermark. Channel will become writable when number of outstanding
60      * bytes dips below this value.
61      */
62     private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
63
64     /**
65      * Default write high watermark. Channel will become un-writable when number of
66      * outstanding bytes hits this value.
67      */
68     private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
69
70     private final AtomicBoolean flushScheduled = new AtomicBoolean();
71     protected final ConnectionAdapterImpl parent;
72     protected final InetSocketAddress address;
73     protected final O currentQueue;
74     private final T handler;
75
76     // Accessed concurrently
77     private volatile PipelineState state = PipelineState.IDLE;
78
79     // Updated from netty only
80     private boolean alreadyReading;
81     protected boolean shuttingDown;
82
83     // Passed to executor to request triggering of flush
84     protected final Runnable flushRunnable = this::flush;
85
86     AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
87         this.parent = requireNonNull(parent);
88         this.handler = requireNonNull(handler);
89         this.address = address;
90         /* Note: don't wish to use reflection here */
91         currentQueue = initializeStackedOutboudnqueue();
92         LOG.debug("Queue manager instantiated with queue {}", currentQueue);
93
94         handler.onConnectionQueueChanged(currentQueue);
95     }
96
97     /**
98      * Method has to initialize some child of {@link AbstractStackedOutboundQueue}.
99      *
100      * @return correct implementation of StacketOutboundqueue
101      */
102     protected abstract O initializeStackedOutboudnqueue();
103
104     @Override
105     public void close() {
106         handler.onConnectionQueueChanged(null);
107     }
108
109     @Override
110     public String toString() {
111         return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
112     }
113
114     @Override
115     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
116         /*
117          * Tune channel write buffering. We increase the writability window
118          * to ensure we can flush an entire queue segment in one go. We definitely
119          * want to keep the difference above 64k, as that will ensure we use jam-packed
120          * TCP packets. UDP will fragment as appropriate.
121          */
122         ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
123         ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
124
125         super.handlerAdded(ctx);
126     }
127
128     @Override
129     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
130         super.channelActive(ctx);
131         conditionalFlush();
132     }
133
134     @Override
135     public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
136         super.channelReadComplete(ctx);
137
138         // Run flush regardless of writability. This is not strictly required, as
139         // there may be a scheduled flush. Instead of canceling it, which is expensive,
140         // we'll steal its work. Note that more work may accumulate in the time window
141         // between now and when the task will run, so it may not be a no-op after all.
142         //
143         // The reason for this is to fill the output buffer before we go into selection
144         // phase. This will make sure the pipe is full (in which case our next wake up
145         // will be the queue becoming writable).
146         writeAndFlush();
147         alreadyReading = false;
148     }
149
150     @Override
151     public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
152         super.channelWritabilityChanged(ctx);
153
154         // The channel is writable again. There may be a flush task on the way, but let's
155         // steal its work, potentially decreasing latency. Since there is a window between
156         // now and when it will run, it may still pick up some more work to do.
157         LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
158         writeAndFlush();
159     }
160
161     @Override
162     public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
163         // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
164         // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
165         // in the queue.
166         super.channelInactive(ctx);
167
168         LOG.debug("Channel {} initiating shutdown...", ctx.channel());
169
170         // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
171         // indefinitely) and failing not completed entries.
172         shuttingDown = true;
173         final long entries = currentQueue.startShutdown();
174         LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
175
176         // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
177         // when there is more than shutdownOffset messages enqueued in unflushed segments
178         // (AbstractStackedOutboundQueue#finishShutdown()).
179         scheduleFlush();
180     }
181
182     @Override
183     public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
184         // Netty does not provide a 'start reading' callback, so this is our first
185         // (and repeated) chance to detect reading. Since this callback can be invoked
186         // multiple times, we keep a boolean we check. That prevents a volatile write
187         // on repeated invocations. It will be cleared in channelReadComplete().
188         if (!alreadyReading) {
189             alreadyReading = true;
190             state = PipelineState.READING;
191         }
192         super.channelRead(ctx, msg);
193     }
194
195     /**
196      * Invoked whenever a message comes in from the switch. Runs matching
197      * on all active queues in an attempt to complete a previous request.
198      *
199      * @param message Potential response message
200      * @return True if the message matched a previous request, false otherwise.
201      */
202     boolean onMessage(final OfHeader message) {
203         LOG.trace("Attempting to pair message {} to a request", message);
204
205         return currentQueue.pairRequest(message);
206     }
207
208     T getHandler() {
209         return handler;
210     }
211
212     void ensureFlushing() {
213         // If the channel is not writable, there's no point in waking up,
214         // once we become writable, we will run a full flush
215         if (!parent.getChannel().isWritable()) {
216             return;
217         }
218
219         // We are currently reading something, just a quick sync to ensure we will in fact
220         // flush state.
221         final PipelineState localState = state;
222         LOG.debug("Synchronize on pipeline state {}", localState);
223         switch (localState) {
224             case READING:
225                 // Netty thread is currently reading, it will flush the pipeline once it
226                 // finishes reading. This is a no-op situation.
227                 break;
228             case WRITING:
229             case IDLE:
230             default:
231                 // We cannot rely on the change being flushed, schedule a request
232                 scheduleFlush();
233         }
234     }
235
236     /**
237      * Method immediately response on Echo message.
238      *
239      * @param message incoming Echo message from device
240      * @param datapathId the dpnId of the node
241      */
242     void onEchoRequest(final EchoRequestMessage message, BigInteger datapathId) {
243         LOG.debug("echo request received: {} for the DPN {}", message.getXid(), datapathId);
244         final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
245                 .setVersion(message.getVersion()).setXid(message.getXid()).build();
246         parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
247     }
248
249     /**
250      * Wraps outgoing message and includes listener attached to this message
251      * which is send to OFEncoder for serialization. Correct wrapper is
252      * selected by communication pipeline.
253      */
254     void writeMessage(final OfHeader message, final long now) {
255         final Object wrapper = makeMessageListenerWrapper(message);
256         parent.getChannel().write(wrapper);
257     }
258
259     /**
260      * Wraps outgoing message and includes listener attached to this message
261      * which is send to OFEncoder for serialization. Correct wrapper is
262      * selected by communication pipeline.
263      */
264     protected Object makeMessageListenerWrapper(@NonNull final OfHeader msg) {
265         checkArgument(msg != null);
266         if (address == null) {
267             return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
268         }
269         return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
270     }
271
272     /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
273     private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = future -> {
274         if (future.cause() != null) {
275             LOG.warn("Message encoding fail !", future.cause());
276         }
277     };
278
279     /**
280      * Perform a single flush operation. We keep it here so we do not generate
281      * syntetic accessors for private fields. Otherwise it could be moved into {@link #flushRunnable}.
282      */
283     protected void flush() {
284         // If the channel is gone, just flush whatever is not completed
285         if (!shuttingDown) {
286             LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
287             writeAndFlush();
288             rescheduleFlush();
289         } else {
290             close();
291             if (currentQueue.finishShutdown(parent.getChannel())) {
292                 LOG.debug("Channel {} shutdown complete", parent.getChannel());
293             } else {
294                 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
295                 rescheduleFlush();
296             }
297         }
298     }
299
300     private void scheduleFlush() {
301         if (flushScheduled.compareAndSet(false, true)) {
302             LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
303             parent.getChannel().eventLoop().execute(flushRunnable);
304         } else {
305             LOG.trace("Flush task is already present on channel {}", parent.getChannel());
306         }
307     }
308
309     private void writeAndFlush() {
310         state = PipelineState.WRITING;
311
312         final long start = System.nanoTime();
313
314         final int entries = currentQueue.writeEntries(parent.getChannel(), start);
315         if (entries > 0) {
316             LOG.trace("Flushing channel {}", parent.getChannel());
317             parent.getChannel().flush();
318         }
319
320         if (LOG.isDebugEnabled()) {
321             final long stop = System.nanoTime();
322             LOG.debug("Flushed {} messages to channel {} in {}us", entries, parent.getChannel(),
323                     TimeUnit.NANOSECONDS.toMicros(stop - start));
324         }
325
326         state = PipelineState.IDLE;
327     }
328
329     private void rescheduleFlush() {
330         /*
331          * We are almost ready to terminate. This is a bit tricky, because
332          * we do not want to have a race window where a message would be
333          * stuck on the queue without a flush being scheduled.
334          * So we mark ourselves as not running and then re-check if a
335          * flush out is needed. That will re-synchronized with other threads
336          * such that only one flush is scheduled at any given time.
337          */
338         if (!flushScheduled.compareAndSet(true, false)) {
339             LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
340         }
341
342         conditionalFlush();
343     }
344
345     /**
346      * Schedule a queue flush if it is not empty and the channel is found
347      * to be writable. May only be called from Netty context.
348      */
349     private void conditionalFlush() {
350         if (currentQueue.needsFlush()) {
351             if (shuttingDown || parent.getChannel().isWritable()) {
352                 scheduleFlush();
353             } else {
354                 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
355             }
356         } else {
357             LOG.trace("Queue is empty, no flush needed");
358         }
359     }
360 }