2 * Copyright (c) 2015 Pantheon Technologies s.r.o. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowjava.protocol.impl.core.connection;
10 import com.google.common.base.Preconditions;
11 import io.netty.channel.ChannelHandlerContext;
12 import io.netty.channel.ChannelInboundHandlerAdapter;
13 import java.net.InetSocketAddress;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.atomic.AtomicBoolean;
16 import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
17 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
18 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
19 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
20 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
21 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
25 final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
26 private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
29 * Default low write watermark. Channel will become writable when number of outstanding
30 * bytes dips below this value.
32 private static final int DEFAULT_LOW_WATERMARK = 128 * 1024;
35 * Default write high watermark. Channel will become un-writable when number of
36 * outstanding bytes hits this value.
38 private static final int DEFAULT_HIGH_WATERMARK = DEFAULT_LOW_WATERMARK * 2;
40 private final AtomicBoolean flushScheduled = new AtomicBoolean();
41 private final StackedOutboundQueue currentQueue;
42 private final ConnectionAdapterImpl parent;
43 private final InetSocketAddress address;
44 private final int maxNonBarrierMessages;
45 private final long maxBarrierNanos;
46 private final T handler;
48 // Accessed concurrently
49 private volatile boolean reading;
51 // Updated from netty only
52 private boolean alreadyReading;
53 private boolean barrierTimerEnabled;
54 private long lastBarrierNanos = System.nanoTime();
55 private int nonBarrierMessages;
56 private boolean shuttingDown;
58 // Passed to executor to request triggering of flush
59 private final Runnable flushRunnable = new Runnable() {
66 // Passed to executor to request a periodic barrier check
67 private final Runnable barrierRunnable = new Runnable() {
74 OutboundQueueManager(final ConnectionAdapterImpl parent, final InetSocketAddress address, final T handler,
75 final int maxNonBarrierMessages, final long maxBarrierNanos) {
76 this.parent = Preconditions.checkNotNull(parent);
77 this.handler = Preconditions.checkNotNull(handler);
78 Preconditions.checkArgument(maxNonBarrierMessages > 0);
79 this.maxNonBarrierMessages = maxNonBarrierMessages;
80 Preconditions.checkArgument(maxBarrierNanos > 0);
81 this.maxBarrierNanos = maxBarrierNanos;
82 this.address = address;
84 currentQueue = new StackedOutboundQueue(this);
85 LOG.debug("Queue manager instantiated with queue {}", currentQueue);
86 handler.onConnectionQueueChanged(currentQueue);
95 handler.onConnectionQueueChanged(null);
98 private void scheduleBarrierTimer(final long now) {
99 long next = lastBarrierNanos + maxBarrierNanos;
101 LOG.trace("Attempted to schedule barrier in the past, reset maximum)");
102 next = now + maxBarrierNanos;
105 final long delay = next - now;
106 LOG.trace("Scheduling barrier timer {}us from now", TimeUnit.NANOSECONDS.toMicros(delay));
107 parent.getChannel().eventLoop().schedule(barrierRunnable, next - now, TimeUnit.NANOSECONDS);
108 barrierTimerEnabled = true;
111 private void scheduleBarrierMessage() {
112 final Long xid = currentQueue.reserveBarrierIfNeeded();
114 LOG.trace("Queue {} already contains a barrier, not scheduling one", currentQueue);
118 currentQueue.commitEntry(xid, handler.createBarrierRequest(xid), null);
119 LOG.trace("Barrier XID {} scheduled", xid);
124 * Invoked whenever a message comes in from the switch. Runs matching
125 * on all active queues in an attempt to complete a previous request.
127 * @param message Potential response message
128 * @return True if the message matched a previous request, false otherwise.
130 boolean onMessage(final OfHeader message) {
131 LOG.trace("Attempting to pair message {} to a request", message);
133 return currentQueue.pairRequest(message);
136 private void scheduleFlush() {
137 if (flushScheduled.compareAndSet(false, true)) {
138 LOG.trace("Scheduling flush task on channel {}", parent.getChannel());
139 parent.getChannel().eventLoop().execute(flushRunnable);
141 LOG.trace("Flush task is already present on channel {}", parent.getChannel());
146 * Periodic barrier check.
148 protected void barrier() {
149 LOG.debug("Channel {} barrier timer expired", parent.getChannel());
150 barrierTimerEnabled = false;
152 LOG.trace("Channel shut down, not processing barrier");
156 final long now = System.nanoTime();
157 final long sinceLast = now - lastBarrierNanos;
158 if (sinceLast >= maxBarrierNanos) {
159 LOG.debug("Last barrier at {} now {}, elapsed {}", lastBarrierNanos, now, sinceLast);
160 // FIXME: we should be tracking requests/responses instead of this
161 if (nonBarrierMessages == 0) {
162 LOG.trace("No messages written since last barrier, not issuing one");
164 scheduleBarrierMessage();
169 private void rescheduleFlush() {
171 * We are almost ready to terminate. This is a bit tricky, because
172 * we do not want to have a race window where a message would be
173 * stuck on the queue without a flush being scheduled.
175 * So we mark ourselves as not running and then re-check if a
176 * flush out is needed. That will re-synchronized with other threads
177 * such that only one flush is scheduled at any given time.
179 if (!flushScheduled.compareAndSet(true, false)) {
180 LOG.warn("Channel {} queue {} flusher found unscheduled", parent.getChannel(), this);
186 private void writeAndFlush() {
187 final long start = System.nanoTime();
189 final int entries = currentQueue.writeEntries(parent.getChannel(), start);
191 LOG.debug("Flushing channel {}", parent.getChannel());
192 parent.getChannel().flush();
195 if (LOG.isDebugEnabled()) {
196 final long stop = System.nanoTime();
197 LOG.debug("Flushed {} messages to channel {} in {}us", entries,
198 parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start));
203 * Perform a single flush operation. We keep it here so we do not generate
204 * syntetic accessors for private fields. Otherwise it could be moved into
205 * {@link #flushRunnable}.
207 protected void flush() {
208 // If the channel is gone, just flush whatever is not completed
210 LOG.debug("Dequeuing messages to channel {}", parent.getChannel());
213 } else if (currentQueue.finishShutdown()) {
214 handler.onConnectionQueueChanged(null);
215 LOG.debug("Channel {} shutdown complete", parent.getChannel());
217 LOG.trace("Channel {} current queue not completely flushed yet", parent.getChannel());
223 * Schedule a queue flush if it is not empty and the channel is found
224 * to be writable. May only be called from Netty context.
226 private void conditionalFlush() {
227 if (currentQueue.needsFlush()) {
228 if (shuttingDown || parent.getChannel().isWritable()) {
231 LOG.debug("Channel {} is not I/O ready, not scheduling a flush", parent.getChannel());
234 LOG.trace("Queue is empty, no flush needed");
239 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
240 super.channelActive(ctx);
244 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
246 * Tune channel write buffering. We increase the writability window
247 * to ensure we can flush an entire queue segment in one go. We definitely
248 * want to keep the difference above 64k, as that will ensure we use jam-packed
249 * TCP packets. UDP will fragment as appropriate.
251 ctx.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
252 ctx.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
254 super.handlerAdded(ctx);
258 public void channelWritabilityChanged(final ChannelHandlerContext ctx) throws Exception {
259 super.channelWritabilityChanged(ctx);
261 // A simple trade-off. While we could write things right away, if there is a task
262 // schedule, let it have the work
263 if (flushScheduled.compareAndSet(false, true)) {
264 LOG.debug("Channel {} writability changed, invoking flush", parent.getChannel());
267 LOG.debug("Channel {} Writability changed, but task is already scheduled", parent.getChannel());
272 public void channelInactive(final ChannelHandlerContext ctx) throws Exception {
273 super.channelInactive(ctx);
275 LOG.debug("Channel {} initiating shutdown...", ctx.channel());
278 final long entries = currentQueue.startShutdown(ctx.channel());
279 LOG.debug("Cleared {} queue entries from channel {}", entries, ctx.channel());
285 public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
286 // non-volatile read if we are called multiple times
287 if (!alreadyReading) {
288 alreadyReading = true;
291 super.channelRead(ctx, msg);
295 public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception {
296 super.channelReadComplete(ctx);
297 alreadyReading = false;
300 // TODO: model this as an atomic gate. We need to sync on it to make sure
301 // that ensureFlushing() suppresses scheudling only if this barrier
302 // has not been crossed.
303 synchronized (this) {
304 // Run flush regardless of writability. This is not strictly required, as
305 // there may be a scheduled flush. Instead of canceling it, which is expensive,
306 // we'll steal its work. Note that more work may accumulate in the time window
307 // between now and when the task will run, so it may not be a no-op after all.
309 // The reason for this is to will the output buffer before we go into selection
310 // phase. This will make sure the pipe is full (in which case our next wake up
311 // will be the queue becoming writable).
315 LOG.debug("Opportunistic write on channel {}", parent.getChannel());
320 public String toString() {
321 return String.format("Channel %s queue [flushing=%s]", parent.getChannel(), flushScheduled.get());
324 void ensureFlushing() {
325 // If the channel is not writable, there's no point in waking up,
326 // once we become writable, we will run a full flush
327 if (!parent.getChannel().isWritable()) {
331 // We are currently reading something, just a quick sync to ensure we will in fact
334 synchronized (this) {
341 // Netty thread is outside our code, we need to schedule a flush
342 // to re-synchronize.
346 void onEchoRequest(final EchoRequestMessage message) {
347 final EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
348 parent.getChannel().writeAndFlush(reply);
352 * Write a message into the underlying channel.
354 * @param now Time reference for 'now'. We take this as an argument, as
355 * we need a timestamp to mark barrier messages we see swinging
356 * by. That timestamp does not need to be completely accurate,
357 * hence we use the flush start time. Alternative would be to
358 * measure System.nanoTime() for each barrier -- needlessly
361 void writeMessage(final OfHeader message, final long now) {
362 final Object wrapper;
363 if (address == null) {
364 wrapper = new MessageListenerWrapper(message, null);
366 wrapper = new UdpMessageListenerWrapper(message, null, address);
368 parent.getChannel().write(wrapper);
370 if (message instanceof BarrierInput) {
371 LOG.trace("Barrier message seen, resetting counters");
372 nonBarrierMessages = 0;
373 lastBarrierNanos = now;
375 nonBarrierMessages++;
376 if (nonBarrierMessages >= maxNonBarrierMessages) {
377 LOG.trace("Scheduled barrier request after {} non-barrier messages", nonBarrierMessages);
378 scheduleBarrierMessage();
379 } else if (!barrierTimerEnabled) {
380 scheduleBarrierTimer(now);