2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import io.netty.util.concurrent.Future;
14 import io.netty.util.concurrent.GenericFutureListener;
15 import java.net.InetSocketAddress;
16 import java.util.concurrent.TimeUnit;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import javax.annotation.Nonnull;
19 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
22 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
23 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
24 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
28 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
29 private static enum PipelineState {
31 * Netty thread is potentially idle, no assumptions
32 * can be made about its state.
36 * Netty thread is currently reading, once the read completes,
37 * if will flush the queue in the {@link #FLUSHING} state.
41 * Netty thread is currently performing a flush on the queue.
42 * It will then transition to {@link #IDLE} state.
47 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
50 * Default low write watermark. Channel will become writable when number of outstanding
51 * bytes dips below this value.
53 private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
56 * Default write high watermark. Channel will become un-writable when number of
57 * outstanding bytes hits this value.
59 private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
61 private final AtomicBoolean flushScheduled = new AtomicBoolean();
62 private final StackedOutboundQueue currentQueue;
63 private final ConnectionAdapterImpl parent;
64 private final InetSocketAddress address;
65 private final int maxNonBarrierMessages;
66 private final long maxBarrierNanos;
67 private final T handler;
69 // Accessed concurrently
70 private volatile PipelineState state = PipelineState.IDLE;
72 // Updated from netty only
73 private boolean alreadyReading;
74 private boolean barrierTimerEnabled;
75 private long lastBarrierNanos = System.nanoTime();
76 private int nonBarrierMessages;
77 private boolean shuttingDown;
79 // Passed to executor to request triggering of flush
80 private final Runnable flushRunnable = new Runnable() {
87 // Passed to executor to request a periodic barrier check
88 private final Runnable barrierRunnable = new Runnable() {
95 OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
96 final int maxNonBarrierMessages, final long maxBarrierNanos) {
97 this.parent = Preconditions.checkNotNull(parent);
98 this.handler = Preconditions.checkNotNull(handler);
99 Preconditions.checkArgument(maxNonBarrierMessages > 0);
100 this.maxNonBarrierMessages = maxNonBarrierMessages;
101 Preconditions.checkArgument(maxBarrierNanos > 0);
102 this.maxBarrierNanos = maxBarrierNanos;
103 this.address = address;
105 currentQueue = new StackedOutboundQueue(this);
106 LOG.debug("Queue manager instantiated with queue {}", currentQueue);
107 handler.onConnectionQueueChanged(currentQueue);
115 public void close() {
116 handler.onConnectionQueueChanged(null);
119 private void scheduleBarrierTimer(final long now) {
120 long next = lastBarrierNanos + maxBarrierNanos;
122 LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
123 next = now + maxBarrierNanos;
126 final long delay = next - now;
127 LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
128 parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
129 barrierTimerEnabled = true;
132 private void scheduleBarrierMessage() {
133 final Long xid = currentQueue.reserveBarrierIfNeeded();
135 LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
139 currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
140 LOG.trace("Barrier XID {} scheduled", xid);
145 * Invoked whenever a message comes in from the switch. Runs matching
146 * on all active queues in an attempt to complete a previous request.
148 * @param message Potential response message
149 * @return True if the message matched a previous request, false otherwise.
151 boolean onMessage(final OfHeader message) {
152 LOG.trace("Attempting to pair message {} to a request", message);
154 return currentQueue.pairRequest(message);
157 private void scheduleFlush() {
158 if (flushScheduled.compareAndSet(false, true)) {
159 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
160 parent.getChannel().eventLoop().execute(flushRunnable);
162 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
167 * Periodic barrier check.
169 protected void barrier() {
170 LOG.debug("Channel {} barrier timer expired", parent.getChannel());
171 barrierTimerEnabled = false;
173 LOG.trace("Channel shut down, not processing barrier");
177 final long now = System.nanoTime();
178 final long sinceLast = now - lastBarrierNanos;
179 if (sinceLast >= maxBarrierNanos) {
180 LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
181 // FIXME: we should be tracking requests/responses instead of this
182 if (nonBarrierMessages == 0) {
183 LOG.trace("No messages written since last barrier, not issuing one");
185 scheduleBarrierMessage();
190 private void rescheduleFlush() {
192 * We are almost ready to terminate. This is a bit tricky, because
193 * we do not want to have a race window where a message would be
194 * stuck on the queue without a flush being scheduled.
196 * So we mark ourselves as not running and then re-check if a
197 * flush out is needed. That will re-synchronized with other threads
198 * such that only one flush is scheduled at any given time.
200 if (!flushScheduled.compareAndSet(true, false)) {
201 LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
207 private void writeAndFlush() {
208 state = PipelineState.WRITING;
210 final long start = System.nanoTime();
212 final int entries = currentQueue.writeEntries(parent.getChannel(), start);
214 LOG.trace("Flushing channel {}", parent.getChannel());
215 parent.getChannel().flush();
218 if (LOG.isDebugEnabled()) {
219 final long stop = System.nanoTime();
220 LOG.debug("Flushed {} messages to channel {} in {}us", entries,
221 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
224 state = PipelineState.IDLE;
228 * Perform a single flush operation. We keep it here so we do not generate
229 * syntetic accessors for private fields. Otherwise it could be moved into
230 * {@link #flushRunnable}.
232 protected void flush() {
233 // If the channel is gone, just flush whatever is not completed
235 LOG.trace("Dequeuing messages to channel {}", parent.getChannel());
238 } else if (currentQueue.finishShutdown()) {
239 handler.onConnectionQueueChanged(null);
240 LOG.debug("Channel {} shutdown complete", parent.getChannel());
242 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
248 * Schedule a queue flush if it is not empty and the channel is found
249 * to be writable. May only be called from Netty context.
251 private void conditionalFlush() {
252 if (currentQueue.needsFlush()) {
253 if (shuttingDown || parent.getChannel().isWritable()) {
256 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
259 LOG.trace("Queue is empty, no flush needed");
264 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
265 super.channelActive(ctx);
270 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
272 * Tune channel write buffering. We increase the writability window
273 * to ensure we can flush an entire queue segment in one go. We definitely
274 * want to keep the difference above 64k, as that will ensure we use jam-packed
275 * TCP packets. UDP will fragment as appropriate.
277 ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
278 ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
280 super.handlerAdded(ctx);
284 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
285 super.channelWritabilityChanged(ctx);
287 // The channel is writable again. There may be a flush task on the way, but let's
288 // steal its work, potentially decreasing latency. Since there is a window between
289 // now and when it will run, it may still pick up some more work to do.
290 LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
295 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
296 super.channelInactive(ctx);
298 LOG.debug("Channel {} initiating shutdown...", ctx.channel());
301 final long entries = currentQueue.startShutdown(ctx.channel());
302 LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
308 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
309 // Netty does not provide a 'start reading' callback, so this is our first
310 // (and repeated) chance to detect reading. Since this callback can be invoked
311 // multiple times, we keep a boolean we check. That prevents a volatile write
312 // on repeated invocations. It will be cleared in channelReadComplete().
313 if (!alreadyReading) {
314 alreadyReading = true;
315 state = PipelineState.READING;
317 super.channelRead(ctx, msg);
321 public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
322 super.channelReadComplete(ctx);
324 // Run flush regardless of writability. This is not strictly required, as
325 // there may be a scheduled flush. Instead of canceling it, which is expensive,
326 // we'll steal its work. Note that more work may accumulate in the time window
327 // between now and when the task will run, so it may not be a no-op after all.
329 // The reason for this is to will the output buffer before we go into selection
330 // phase. This will make sure the pipe is full (in which case our next wake up
331 // will be the queue becoming writable).
336 public String toString() {
337 return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
340 void ensureFlushing() {
341 // If the channel is not writable, there's no point in waking up,
342 // once we become writable, we will run a full flush
343 if (!parent.getChannel().isWritable()) {
347 // We are currently reading something, just a quick sync to ensure we will in fact
349 final PipelineState localState = state;
350 LOG.debug("Synchronize on pipeline state {}", localState);
351 switch (localState) {
353 // Netty thread is currently reading, it will flush the pipeline once it
354 // finishes reading. This is a no-op situation.
359 // We cannot rely on the change being flushed, schedule a request
364 void onEchoRequest(final EchoRequestMessage message) {
365 final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData())
366 .setVersion(message.getVersion()).setXid(message.getXid()).build();
367 parent.getChannel().writeAndFlush(makeMessageListenerWrapper(reply));
371 * Write a message into the underlying channel.
373 * @param now Time reference for 'now'. We take this as an argument, as
374 * we need a timestamp to mark barrier messages we see swinging
375 * by. That timestamp does not need to be completely accurate,
376 * hence we use the flush start time. Alternative would be to
377 * measure System.nanoTime() for each barrier -- needlessly
380 void writeMessage(final OfHeader message, final long now) {
381 final Object wrapper = makeMessageListenerWrapper(message);
382 parent.getChannel().write(wrapper);
384 if (message instanceof BarrierInput) {
385 LOG.trace("Barrier message seen, resetting counters");
386 nonBarrierMessages = 0;
387 lastBarrierNanos = now;
389 nonBarrierMessages++;
390 if (nonBarrierMessages >= maxNonBarrierMessages) {
391 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
392 scheduleBarrierMessage();
393 } else if (!barrierTimerEnabled) {
394 scheduleBarrierTimer(now);
400 * Wraps outgoing message and includes listener attached to this message
401 * which is send to OFEncoder for serialization. Correct wrapper is
402 * selected by communication pipeline.
406 private Object makeMessageListenerWrapper(@Nonnull final OfHeader msg) {
407 Preconditions.checkArgument(msg != null);
409 if (address == null) {
410 return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
412 return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, address);
415 /* NPE are coming from {@link OFEncoder#encode} from catch block and we don't wish to lost it */
416 private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() {
418 private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");
421 public void operationComplete(final Future<Void> future) throws Exception {
422 if (future.cause() != null) {
423 LOGGER.warn("Message encoding fail !", future.cause());