2 * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import static com.google.common.base.Preconditions.checkArgument;
11 import static java.util.Objects.requireNonNull;
13 import io.netty.channel.ChannelHandlerContext;
14 import io.netty.channel.ChannelInboundHandlerAdapter;
15 import io.netty.util.concurrent.Future;
16 import io.netty.util.concurrent.GenericFutureListener;
17 import java.math.BigInteger;
18 import java.net.InetSocketAddress;
19 import java.util.concurrent.TimeUnit;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.eclipse.jdt.annotation.NonNull;
22 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
25 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
26 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
31 * Class capsulate basic processing for stacking requests for netty channel
32 * and provide functionality for pairing request/response device message communication.
34 abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
35 extends ChannelInboundHandlerAdapter
36 implements AutoCloseable {
38 private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
40 private enum PipelineState {
42 * Netty thread is potentially idle, no assumptions
43 * can be made about its state.
47 * Netty thread is currently reading, once the read completes,
48 * if will flush the queue in the {@link #WRITING} state.
52 * Netty thread is currently performing a flush on the queue.
53 * It will then transition to {@link #IDLE} state.
59 * Default low write watermark. Channel will become writable when number of outstanding
60 * bytes dips below this value.
62 private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
65 * Default write high watermark. Channel will become un-writable when number of
66 * outstanding bytes hits this value.
68 private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
70 private final AtomicBoolean flushScheduled = new AtomicBoolean();
71 protected final ConnectionAdapterImpl parent;
72 protected final InetSocketAddress address;
73 protected final O currentQueue;
74 private final T handler;
76 // Accessed concurrently
77 private volatile PipelineState state = PipelineState.IDLE;
79 // Updated from netty only
80 private boolean alreadyReading;
81 protected boolean shuttingDown;
83 // Passed to executor to request triggering of flush
84 protected final Runnable flushRunnable = this::flush;
86 AbstractOutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler) {
87 this.parent = requireNonNull(parent);
88 this.handler = requireNonNull(handler);
89 this.address = address;
90 /* Note: don't wish to use reflection here */
91 currentQueue = initializeStackedOutboudnqueue();
92 LOG.debug("Queue manager instantiated with queue {}", currentQueue);
94 handler.onConnectionQueueChanged(currentQueue);
98 * Method has to initialize some child of {@link AbstractStackedOutboundQueue}.
100 * @return correct implementation of StacketOutboundqueue
102 protected abstract O initializeStackedOutboudnqueue();
105 public void close() {
106 handler.onConnectionQueueChanged(null);
110 public String toString() {
111 return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
115 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
117 * Tune channel write buffering. We increase the writability window
118 * to ensure we can flush an entire queue segment in one go. We definitely
119 * want to keep the difference above 64k, as that will ensure we use jam-packed
120 * TCP packets. UDP will fragment as appropriate.
122 ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
123 ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
125 super.handlerAdded(ctx);
129 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
130 super.channelActive(ctx);
135 public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
136 super.channelReadComplete(ctx);
138 // Run flush regardless of writability. This is not strictly required, as
139 // there may be a scheduled flush. Instead of canceling it, which is expensive,
140 // we'll steal its work. Note that more work may accumulate in the time window
141 // between now and when the task will run, so it may not be a no-op after all.
143 // The reason for this is to fill the output buffer before we go into selection
144 // phase. This will make sure the pipe is full (in which case our next wake up
145 // will be the queue becoming writable).
147 alreadyReading = false;
151 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
152 super.channelWritabilityChanged(ctx);
154 // The channel is writable again. There may be a flush task on the way, but let's
155 // steal its work, potentially decreasing latency. Since there is a window between
156 // now and when it will run, it may still pick up some more work to do.
157 LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
162 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
163 // First of all, delegates disconnect event notification into ConnectionAdapter -> OF Plugin -> queue.close()
164 // -> queueHandler.onConnectionQueueChanged(null). The last call causes that no more entries are enqueued
166 super.channelInactive(ctx);
168 LOG.debug("Channel {} initiating shutdown...", ctx.channel());
170 // Then we start queue shutdown, start counting written messages (so that we don't keep sending messages
171 // indefinitely) and failing not completed entries.
173 final long entries = currentQueue.startShutdown();
174 LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
176 // Finally, we schedule flush task that will take care of unflushed entries. We also cover the case,
177 // when there is more than shutdownOffset messages enqueued in unflushed segments
178 // (AbstractStackedOutboundQueue#finishShutdown()).
183 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
184 // Netty does not provide a 'start reading' callback, so this is our first
185 // (and repeated) chance to detect reading. Since this callback can be invoked
186 // multiple times, we keep a boolean we check. That prevents a volatile write
187 // on repeated invocations. It will be cleared in channelReadComplete().
188 if (!alreadyReading) {
189 alreadyReading = true;
190 state = PipelineState.READING;
192 super.channelRead(ctx, msg);
196 * Invoked whenever a message comes in from the switch. Runs matching
197 * on all active queues in an attempt to complete a previous request.
199 * @param message Potential response message
200 * @return True if the message matched a previous request, false otherwise.
202 boolean onMessage(final OfHeader message) {
203 LOG.trace("Attempting to pair message {} to a request", message);
205 return currentQueue.pairRequest(message);
212 void ensureFlushing() {
213 // If the channel is not writable, there's no point in waking up,
214 // once we become writable, we will run a full flush
215 if (!parent.getChannel().isWritable()) {
219 // We are currently reading something, just a quick sync to ensure we will in fact
221 final PipelineState localState = state;
222 LOG.debug("Synchronize on pipeline state {}", localState);
223 switch (localState) {
225 // Netty thread is currently reading, it will flush the pipeline once it
226 // finishes reading. This is a no-op situation.
231 // We cannot rely on the change being flushed, schedule a request
237 * Method immediately response on Echo message.
239 * @param message incoming Echo message from device
240 * @param datapathId the dpnId of the node
242 void onEchoRequest(final EchoRequestMessage message, BigInteger datapathId) {
243 LOG.debug("echo request received: {} for the DPN {}", message.getXid(), datapathId);
244 final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
245 .setVersion(message.getVersion()).setXid(message.getXid()).build();
246 parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
250 * Wraps outgoing message and includes listener attached to this message
251 * which is send to OFEncoder for serialization. Correct wrapper is
252 * selected by communication pipeline.
254 void writeMessage(final OfHeader message, final long now) {
255 final Object wrapper = makeMessageListenerWrapper(message);
256 parent.getChannel().write(wrapper);
260 * Wraps outgoing message and includes listener attached to this message
261 * which is send to OFEncoder for serialization. Correct wrapper is
262 * selected by communication pipeline.
264 protected Object makeMessageListenerWrapper(@NonNull final OfHeader msg) {
265 checkArgument(msg != null);
266 if (address == null) {
267 return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
269 return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
272 /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
273 private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = future -> {
274 if (future.cause() != null) {
275 LOG.warn("Message encoding fail !", future.cause());
280 * Perform a single flush operation. We keep it here so we do not generate
281 * syntetic accessors for private fields. Otherwise it could be moved into {@link #flushRunnable}.
283 protected void flush() {
284 // If the channel is gone, just flush whatever is not completed
286 LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
291 if (currentQueue.finishShutdown(parent.getChannel())) {
292 LOG.debug("Channel {} shutdown complete", parent.getChannel());
294 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
300 private void scheduleFlush() {
301 if (flushScheduled.compareAndSet(false, true)) {
302 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
303 parent.getChannel().eventLoop().execute(flushRunnable);
305 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
309 private void writeAndFlush() {
310 state = PipelineState.WRITING;
312 final long start = System.nanoTime();
314 final int entries = currentQueue.writeEntries(parent.getChannel(), start);
316 LOG.trace("Flushing channel {}", parent.getChannel());
317 parent.getChannel().flush();
320 if (LOG.isDebugEnabled()) {
321 final long stop = System.nanoTime();
322 LOG.debug("Flushed {} messages to channel {} in {}us", entries, parent.getChannel(),
323 TimeUnit.NANOSECONDS.toMicros(stop - start));
326 state = PipelineState.IDLE;
329 private void rescheduleFlush() {
331 * We are almost ready to terminate. This is a bit tricky, because
332 * we do not want to have a race window where a message would be
333 * stuck on the queue without a flush being scheduled.
334 * So we mark ourselves as not running and then re-check if a
335 * flush out is needed. That will re-synchronized with other threads
336 * such that only one flush is scheduled at any given time.
338 if (!flushScheduled.compareAndSet(true, false)) {
339 LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
346 * Schedule a queue flush if it is not empty and the channel is found
347 * to be writable. May only be called from Netty context.
349 private void conditionalFlush() {
350 if (currentQueue.needsFlush()) {
351 if (shuttingDown || parent.getChannel().isWritable()) {
354 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
357 LOG.trace("Queue is empty, no flush needed");